This is an automated email from the ASF dual-hosted git repository. nicoloboschi pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9dd4057d5bccdc93d29cab49cdb969f507a56739 Author: Enrico Olivelli <[email protected]> AuthorDate: Fri Jun 3 13:44:16 2022 +0200 PIP-105 add support for updating the Subscription properties (#15751) * PIP-105 add support for updating the Subscription properties * Implement command update-subscription-properties * Add tests * Add volatile * Fix PersistentTopicTest * PIP-105: Store Subscription properties * Fix FilterEntryTest * Add volatile * Fix PersistentTopicTest * fix ServerCnxTest test * Switch from POST to PUT * rename to /properties * Apply suggestions from code review Co-authored-by: Lari Hotari <[email protected]> Co-authored-by: Lari Hotari <[email protected]> (cherry picked from commit 8e77e88cc54c05fa4f8b360be499c3da61607a66) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 5 +- .../broker/admin/impl/PersistentTopicsBase.java | 104 +++++++++++++++++++++ .../pulsar/broker/admin/v2/PersistentTopics.java | 37 ++++++++ .../broker/admin/AdminApiSubscriptionTest.java | 45 +++++++++ .../org/apache/pulsar/client/admin/Topics.java | 21 +++++ .../pulsar/client/admin/internal/TopicsImpl.java | 19 ++++ .../pulsar/admin/cli/PulsarAdminToolTest.java | 16 ++++ .../org/apache/pulsar/admin/cli/CmdTopics.java | 36 +++++++ .../pulsar/tests/integration/cli/CLITest.java | 25 ++++- 9 files changed, 305 insertions(+), 3 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 1092ca0a89d..424874a8dfa 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -47,6 +47,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -324,7 +325,7 @@ public class ManagedCursorImpl implements ManagedCursor { @Override public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) { CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>(); - ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() { + ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<ManagedCursorInfo>() { @Override public void operationComplete(ManagedCursorInfo info, Stat stat) { ManagedCursorInfo copy = ManagedCursorInfo @@ -333,7 +334,7 @@ public class ManagedCursorImpl implements ManagedCursor { .addAllCursorProperties(buildStringPropertiesMap(cursorProperties)) .build(); ledger.getStore().asyncUpdateCursorInfo(ledger.getName(), - name, copy, stat, new MetaStoreCallback<>() { + name, copy, stat, new MetaStoreCallback<Void>() { @Override public void operationComplete(Void result, Stat stat) { log.info("[{}] Updated ledger cursor: {} properties {}", ledger.getName(), diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 4228ac6254d..f118172045d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1556,6 +1556,32 @@ public class PersistentTopicsBase extends AdminResource { }); } + private void internalUpdateSubscriptionPropertiesForNonPartitionedTopic(AsyncResponse asyncResponse, + String subName, Map<String, String> subscriptionProperties, + boolean authoritative) { + validateTopicOwnershipAsync(topicName, authoritative) + .thenRun(() -> validateTopicOperation(topicName, TopicOperation.CONSUME)) + .thenCompose(__ -> { + Topic topic = getTopicReference(topicName); + Subscription sub = topic.getSubscription(subName); + if (sub == null) { + throw new RestException(Status.NOT_FOUND, "Subscription not found"); + } + return sub.updateSubscriptionProperties(subscriptionProperties); + }).thenRun(() -> { + log.info("[{}][{}] Updated subscription {}", clientAppId(), topicName, subName); + asyncResponse.resume(Response.noContent().build()); + }).exceptionally(ex -> { + Throwable cause = ex.getCause(); + // If the exception is not redirect exception we need to log it. + if (!isRedirectException(ex)) { + log.error("[{}] Failed to update subscription {} {}", clientAppId(), topicName, subName, cause); + } + asyncResponse.resume(new RestException(cause)); + return null; + }); + } + protected void internalDeleteSubscriptionForcefully(AsyncResponse asyncResponse, String subName, boolean authoritative) { CompletableFuture<Void> future; @@ -2276,6 +2302,84 @@ public class PersistentTopicsBase extends AdminResource { }); } + protected void internalUpdateSubscriptionProperties(AsyncResponse asyncResponse, String subName, + Map<String, String> subscriptionProperties, + boolean authoritative) { + CompletableFuture<Void> future; + if (topicName.isGlobal()) { + future = validateGlobalNamespaceOwnershipAsync(namespaceName); + } else { + future = CompletableFuture.completedFuture(null); + } + + future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenAccept(__ -> { + if (topicName.isPartitioned()) { + internalUpdateSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName, + subscriptionProperties, authoritative); + } else { + getPartitionedTopicMetadataAsync(topicName, + authoritative, false).thenAcceptAsync(partitionMetadata -> { + if (partitionMetadata.partitions > 0) { + final List<CompletableFuture<Void>> futures = Lists.newArrayList(); + + for (int i = 0; i < partitionMetadata.partitions; i++) { + TopicName topicNamePartition = topicName.getPartition(i); + try { + futures.add(pulsar().getAdminClient().topics() + .updateSubscriptionPropertiesAsync(topicNamePartition.toString(), + subName, subscriptionProperties)); + } catch (Exception e) { + log.error("[{}] Failed to update properties for subscription {} {}", + clientAppId(), topicNamePartition, subName, + e); + asyncResponse.resume(new RestException(e)); + return; + } + } + + FutureUtil.waitForAll(futures).handle((result, exception) -> { + if (exception != null) { + Throwable t = exception.getCause(); + if (t instanceof NotFoundException) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); + return null; + } else if (t instanceof PreconditionFailedException) { + asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, + "Subscription has active connected consumers")); + return null; + } else { + log.error("[{}] Failed to update properties for subscription {} {}", + clientAppId(), topicName, subName, t); + asyncResponse.resume(new RestException(t)); + return null; + } + } + + asyncResponse.resume(Response.noContent().build()); + return null; + }); + } else { + internalUpdateSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName, + subscriptionProperties, authoritative); + } + }, pulsar().getExecutor()).exceptionally(ex -> { + log.error("[{}] Failed to update properties for subscription {} from topic {}", + clientAppId(), subName, topicName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + }).exceptionally(ex -> { + // If the exception is not redirect exception we need to log it. + if (!isRedirectException(ex)) { + log.error("[{}] Failed to update subscription {} from topic {}", + clientAppId(), subName, topicName, ex); + } + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + protected void internalResetCursorOnPosition(AsyncResponse asyncResponse, String subName, boolean authoritative, MessageIdImpl messageId, boolean isExcluded, int batchIndex) { CompletableFuture<Void> ret; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index fd5cb75b10b..754f54ca515 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -1464,6 +1464,43 @@ public class PersistentTopics extends PersistentTopicsBase { } } + @PUT + @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/properties") + @ApiOperation(value = "Replaces all the properties on the given subscription") + @ApiResponses(value = { + @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), + @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or" + + "subscriber is not authorized to access this operation"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic/Subscription does not exist"), + @ApiResponse(code = 405, message = "Method Not Allowed"), + @ApiResponse(code = 500, message = "Internal server error"), + @ApiResponse(code = 503, message = "Failed to validate global cluster configuration") + }) + public void updateSubscriptionProperties( + @Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Subscription to update", required = true) + @PathParam("subName") String encodedSubName, + @ApiParam(value = "The new properties") Map<String, String> subscriptionProperties, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + try { + validateTopicName(tenant, namespace, encodedTopic); + internalUpdateSubscriptionProperties(asyncResponse, decode(encodedSubName), + subscriptionProperties, authoritative); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } + } + @POST @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/resetcursor") @ApiOperation(value = "Reset subscription to message position closest to given position.", diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java index 8d25c61e1dd..f7af28ddf41 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java @@ -174,5 +174,50 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest { assertTrue(subscriptionStats2.getSubscriptionProperties().isEmpty()); } + // clear the properties on subscriptionName + admin.topics().updateSubscriptionProperties(topic, subscriptionName, new HashMap<>()); + + if (partitioned) { + PartitionedTopicMetadata partitionedTopicMetadata = admin.topics().getPartitionedTopicMetadata(topic); + for (int i = 0; i < partitionedTopicMetadata.partitions; i++) { + SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i) + .getSubscriptions().get(subscriptionName); + assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty()); + } + + // aggregated properties + SubscriptionStats subscriptionStats = admin.topics().getPartitionedStats(topic, false) + .getSubscriptions().get(subscriptionName); + assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty()); + + } else { + SubscriptionStats subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName); + assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty()); + } + + // update the properties on subscriptionName + admin.topics().updateSubscriptionProperties(topic, subscriptionName, properties); + + if (partitioned) { + PartitionedTopicMetadata partitionedTopicMetadata = admin.topics().getPartitionedTopicMetadata(topic); + for (int i = 0; i < partitionedTopicMetadata.partitions; i++) { + SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i) + .getSubscriptions().get(subscriptionName); + assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo")); + } + + // aggregated properties + SubscriptionStats subscriptionStats = admin.topics().getPartitionedStats(topic, false) + .getSubscriptions().get(subscriptionName); + assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo")); + + } else { + SubscriptionStats subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName); + assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo")); + + SubscriptionStats subscriptionStats2 = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName2); + assertTrue(subscriptionStats2.getSubscriptionProperties().isEmpty()); + } + } } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 7b533030961..eab3b8041f3 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -1737,6 +1737,17 @@ public interface Topics { */ void resetCursor(String topic, String subName, MessageId messageId, boolean isExcluded) throws PulsarAdminException; + /** + * Update Subscription Properties on a topic subscription. + * The new properties will override the existing values, properties that are not passed will be removed. + * @param topic + * @param subName + * @param subscriptionProperties + * @throws PulsarAdminException + */ + void updateSubscriptionProperties(String topic, String subName, Map<String, String> subscriptionProperties) + throws PulsarAdminException; + /** * Reset cursor position on a topic subscription. * @@ -1761,6 +1772,16 @@ public interface Topics { */ CompletableFuture<Void> resetCursorAsync(String topic, String subName, MessageId messageId, boolean isExcluded); + /** + * Update Subscription Properties on a topic subscription. + * The new properties will override the existing values, properties that are not passed will be removed. + * @param topic + * @param subName + * @param subscriptionProperties + */ + CompletableFuture<Void> updateSubscriptionPropertiesAsync(String topic, String subName, + Map<String, String> subscriptionProperties); + /** * Reset cursor position on a topic subscription. * diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 1667cc4f90a..a5f5edae290 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -1147,6 +1147,25 @@ public class TopicsImpl extends BaseResource implements Topics { sync(() -> resetCursorAsync(topic, subName, messageId)); } + @Override + public void updateSubscriptionProperties(String topic, String subName, Map<String, String> subscriptionProperties) + throws PulsarAdminException { + sync(() -> updateSubscriptionPropertiesAsync(topic, subName, subscriptionProperties)); + } + + @Override + public CompletableFuture<Void> updateSubscriptionPropertiesAsync(String topic, String subName, + Map<String, String> subscriptionProperties) { + TopicName tn = validateTopic(topic); + String encodedSubName = Codec.encode(subName); + WebTarget path = topicPath(tn, "subscription", encodedSubName, + "properties"); + if (subscriptionProperties == null) { + subscriptionProperties = new HashMap<>(); + } + return asyncPutRequest(path, Entity.entity(subscriptionProperties, MediaType.APPLICATION_JSON)); + } + @Override public void resetCursor(String topic, String subName, MessageId messageId , boolean isExcluded) throws PulsarAdminException { diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 23ad1a04f84..ca129e54971 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -1418,6 +1418,22 @@ public class PulsarAdminToolTest { props.put("a", "b"); verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest, false, props); + cmdTopics = new CmdTopics(() -> admin); + cmdTopics.run(split("create-subscription persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest -r")); + verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest, true, null); + + cmdTopics = new CmdTopics(() -> admin); + cmdTopics.run(split("update-subscription-properties persistent://myprop/clust/ns1/ds1 -s sub1 --clear")); + verify(mockTopics).updateSubscriptionProperties("persistent://myprop/clust/ns1/ds1", "sub1", new HashMap<>()); + + cmdTopics = new CmdTopics(() -> admin); + props = new HashMap<>(); + props.put("a", "b"); + props.put("c", "d"); + cmdTopics.run(split("update-subscription-properties persistent://myprop/clust/ns1/ds1 -s sub1 -p a=b -p c=d")); + verify(mockTopics).updateSubscriptionProperties("persistent://myprop/clust/ns1/ds1", "sub1", props); + + cmdTopics = new CmdTopics(() -> admin); cmdTopics.run(split("create-partitioned-topic persistent://myprop/clust/ns1/ds1 --partitions 32")); verify(mockTopics).createPartitionedTopic("persistent://myprop/clust/ns1/ds1", 32, null); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 883617f89b9..7c3736616e9 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -98,6 +98,7 @@ public class CmdTopics extends CmdBase { jcommander.addCommand("subscriptions", new ListSubscriptions()); jcommander.addCommand("unsubscribe", new DeleteSubscription()); jcommander.addCommand("create-subscription", new CreateSubscription()); + jcommander.addCommand("update-subscription-properties", new UpdateSubscriptionProperties()); jcommander.addCommand("stats", new GetStats()); jcommander.addCommand("stats-internal", new GetInternalStats()); @@ -919,6 +920,41 @@ public class CmdTopics extends CmdBase { } } + @Parameters(commandDescription = "Update the properties of a subscription on a topic") + private class UpdateSubscriptionProperties extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List<String> params; + + @Parameter(names = { "-s", + "--subscription" }, description = "Subscription to update", required = true) + private String subscriptionName; + + @Parameter(names = {"--property", "-p"}, description = "key value pair properties(-p a=b -p c=d)", + required = false) + private java.util.List<String> properties; + + @Parameter(names = {"--clear", "-c"}, description = "Remove all properties", + required = false) + private boolean clear; + + @Override + void run() throws Exception { + String topic = validateTopicName(params); + Map<String, String> map = parseListKeyValueMap(properties); + if (map == null) { + map = Collections.emptyMap(); + } + if ((map.isEmpty()) && !clear) { + throw new ParameterException("If you want to clear the properties you have to use --clear"); + } + if (clear && !map.isEmpty()) { + throw new ParameterException("If you set --clear then you should not pass any properties"); + } + getTopics().updateSubscriptionProperties(topic, subscriptionName, map); + } + } + + @Parameters(commandDescription = "Reset position for subscription to a position that is closest to " + "timestamp or messageId.") private class ResetCursor extends CliCommand { diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java index b0da409e0fe..a1e417ca547 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java @@ -174,7 +174,7 @@ public class CLITest extends PulsarTestSuite { } @Test - public void testCreateSubscriptionWithPropertiesCommand() throws Exception { + public void testCreateUpdateSubscriptionWithPropertiesCommand() throws Exception { String topic = "testCreateSubscriptionCommmand"; String subscriptionPrefix = "subscription-"; @@ -194,6 +194,29 @@ public class CLITest extends PulsarTestSuite { "" + subscriptionPrefix + i ); result.assertNoOutput(); + + ContainerExecResult resultUpdate = container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "topics", + "update-subscription-properties", + "-p", + "a=e", + "persistent://public/default/" + topic, + "--subscription", + "" + subscriptionPrefix + i + ); + resultUpdate.assertNoOutput(); + + ContainerExecResult resultClear = container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "topics", + "update-subscription-properties", + "-c", + "persistent://public/default/" + topic, + "--subscription", + "" + subscriptionPrefix + i + ); + resultClear.assertNoOutput(); i++; } }
