This is an automated email from the ASF dual-hosted git repository. xiangying pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d2ff2936097f8920b4521923685b7623eda5fcdc Author: Enrico Olivelli <[email protected]> AuthorDate: Tue Jun 21 09:39:10 2022 +0200 PIP-105: new API to get subscription properties (#16095) (cherry picked from commit face8bb0ab48d09d5aa293cd9bbdab66232207da) --- .../broker/admin/impl/PersistentTopicsBase.java | 114 +++++++++++++++++++++ .../pulsar/broker/admin/v2/PersistentTopics.java | 36 +++++++ .../broker/admin/AdminApiSubscriptionTest.java | 36 +++++++ .../org/apache/pulsar/client/admin/Topics.java | 16 +++ .../pulsar/client/admin/internal/TopicsImpl.java | 29 ++++++ .../pulsar/admin/cli/PulsarAdminToolTest.java | 4 + .../org/apache/pulsar/admin/cli/CmdTopics.java | 20 ++++ .../pulsar/tests/integration/cli/CLITest.java | 26 +++++ 8 files changed, 281 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 698da80265d..956679df95b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1636,6 +1636,35 @@ public class PersistentTopicsBase extends AdminResource { }); } + private void internalGetSubscriptionPropertiesForNonPartitionedTopic(AsyncResponse asyncResponse, + String subName, + boolean authoritative) { + validateTopicOwnershipAsync(topicName, authoritative) + .thenRun(() -> validateTopicOperation(topicName, TopicOperation.CONSUME)) + .thenCompose(__ -> getTopicReferenceAsync(topicName)) + .thenApply((Topic topic) -> { + Subscription sub = topic.getSubscription(subName); + if (sub == null) { + throw new RestException(Status.NOT_FOUND, + getSubNotFoundErrorMessage(topicName.toString(), subName)); + } + return sub.getSubscriptionProperties(); + }).thenAccept((Map<String, String> properties) -> { + if (properties == null) { + properties = Collections.emptyMap(); + } + asyncResponse.resume(Response.ok(properties).build()); + }).exceptionally(ex -> { + Throwable cause = ex.getCause(); + // If the exception is not redirect exception we need to log it. + if (!isRedirectException(ex)) { + log.error("[{}] Failed to update subscription {} {}", clientAppId(), topicName, subName, cause); + } + asyncResponse.resume(new RestException(cause)); + return null; + }); + } + protected void internalDeleteSubscriptionForcefully(AsyncResponse asyncResponse, String subName, boolean authoritative) { CompletableFuture<Void> future; @@ -2384,6 +2413,91 @@ public class PersistentTopicsBase extends AdminResource { }); } + protected void internalGetSubscriptionProperties(AsyncResponse asyncResponse, String subName, + boolean authoritative) { + CompletableFuture<Void> future; + if (topicName.isGlobal()) { + future = validateGlobalNamespaceOwnershipAsync(namespaceName); + } else { + future = CompletableFuture.completedFuture(null); + } + + future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenAccept(__ -> { + if (topicName.isPartitioned()) { + internalGetSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName, + authoritative); + } else { + getPartitionedTopicMetadataAsync(topicName, + authoritative, false).thenAcceptAsync(partitionMetadata -> { + if (partitionMetadata.partitions > 0) { + final List<CompletableFuture<Map<String, String>>> futures = Lists.newArrayList(); + + for (int i = 0; i < partitionMetadata.partitions; i++) { + TopicName topicNamePartition = topicName.getPartition(i); + try { + futures.add(pulsar().getAdminClient().topics() + .getSubscriptionPropertiesAsync(topicNamePartition.toString(), + subName)); + } catch (Exception e) { + log.error("[{}] Failed to update properties for subscription {} {}", + clientAppId(), topicNamePartition, subName, + e); + asyncResponse.resume(new RestException(e)); + return; + } + } + + FutureUtil.waitForAll(futures).handle((result, exception) -> { + if (exception != null) { + Throwable t = exception.getCause(); + if (t instanceof NotFoundException) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, + getSubNotFoundErrorMessage(topicName.toString(), subName))); + return null; + } else { + log.error("[{}] Failed to get properties for subscription {} {}", + clientAppId(), topicName, subName, t); + asyncResponse.resume(new RestException(t)); + return null; + } + } + + Map<String, String> aggregatedResult = new HashMap<>(); + futures.forEach(f -> { + // in theory all the partitions have the same properties + try { + aggregatedResult.putAll(f.get()); + } catch (Exception impossible) { + // we already waited for this Future + asyncResponse.resume(new RestException(impossible)); + } + }); + + asyncResponse.resume(Response.ok(aggregatedResult).build()); + return null; + }); + } else { + internalGetSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName, + authoritative); + } + }, pulsar().getExecutor()).exceptionally(ex -> { + log.error("[{}] Failed to update properties for subscription {} from topic {}", + clientAppId(), subName, topicName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + }).exceptionally(ex -> { + // If the exception is not redirect exception we need to log it. + if (!isRedirectException(ex)) { + log.error("[{}] Failed to update subscription {} from topic {}", + clientAppId(), subName, topicName, ex); + } + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + protected void internalResetCursorOnPosition(AsyncResponse asyncResponse, String subName, boolean authoritative, MessageIdImpl messageId, boolean isExcluded, int batchIndex) { CompletableFuture<Void> ret; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 56e3799c475..00cc19e0c98 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -1564,6 +1564,42 @@ public class PersistentTopics extends PersistentTopicsBase { } } + @GET + @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/properties") + @ApiOperation(value = "Replaces all the properties on the given subscription") + @ApiResponses(value = { + @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), + @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or" + + "subscriber is not authorized to access this operation"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic/Subscription does not exist"), + @ApiResponse(code = 405, message = "Method Not Allowed"), + @ApiResponse(code = 500, message = "Internal server error"), + @ApiResponse(code = 503, message = "Failed to validate global cluster configuration") + }) + public void getSubscriptionProperties( + @Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Subscription", required = true) + @PathParam("subName") String encodedSubName, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + try { + validateTopicName(tenant, namespace, encodedTopic); + internalGetSubscriptionProperties(asyncResponse, decode(encodedSubName), + authoritative); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } + } + @POST @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/resetcursor") @ApiOperation(value = "Reset subscription to message position closest to given position.", diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java index f7af28ddf41..521ef4df1c8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java @@ -152,6 +152,9 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest { SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i) .getSubscriptions().get(subscriptionName); assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo")); + + Map<String, String> props = admin.topics().getSubscriptionProperties(topic + "-partition-" + i, subscriptionName); + assertEquals(value, props.get("foo")); } // properties are never null, but an empty map @@ -159,6 +162,9 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest { SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i) .getSubscriptions().get(subscriptionName2); assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty()); + + Map<String, String> props = admin.topics().getSubscriptionProperties(topic + "-partition-" + i, subscriptionName2); + assertTrue(props.isEmpty()); } // aggregated properties @@ -166,12 +172,21 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest { .getSubscriptions().get(subscriptionName); assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo")); + Map<String, String> props = admin.topics().getSubscriptionProperties(topic, subscriptionName); + assertEquals(value, props.get("foo")); + } else { SubscriptionStats subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName); assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo")); + Map<String, String> props = admin.topics().getSubscriptionProperties(topic, subscriptionName); + assertEquals(value, props.get("foo")); + SubscriptionStats subscriptionStats2 = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName2); assertTrue(subscriptionStats2.getSubscriptionProperties().isEmpty()); + + Map<String, String> props2 = admin.topics().getSubscriptionProperties(topic, subscriptionName2); + assertTrue(props2.isEmpty()); } // clear the properties on subscriptionName @@ -183,6 +198,9 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest { SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i) .getSubscriptions().get(subscriptionName); assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty()); + + Map<String, String> props = admin.topics().getSubscriptionProperties(topic + "-partition-" + i, subscriptionName); + assertTrue(props.isEmpty()); } // aggregated properties @@ -190,9 +208,15 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest { .getSubscriptions().get(subscriptionName); assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty()); + Map<String, String> props = admin.topics().getSubscriptionProperties(topic, subscriptionName); + assertTrue(props.isEmpty()); + } else { SubscriptionStats subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName); assertTrue(subscriptionStats.getSubscriptionProperties().isEmpty()); + + Map<String, String> props = admin.topics().getSubscriptionProperties(topic, subscriptionName); + assertTrue(props.isEmpty()); } // update the properties on subscriptionName @@ -204,6 +228,9 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest { SubscriptionStats subscriptionStats = admin.topics().getStats(topic + "-partition-" + i) .getSubscriptions().get(subscriptionName); assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo")); + + Map<String, String> props = admin.topics().getSubscriptionProperties(topic + "-partition-" + i, subscriptionName); + assertEquals(value, props.get("foo")); } // aggregated properties @@ -211,12 +238,21 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest { .getSubscriptions().get(subscriptionName); assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo")); + Map<String, String> props = admin.topics().getSubscriptionProperties(topic, subscriptionName); + assertEquals(value, props.get("foo")); + } else { SubscriptionStats subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName); assertEquals(value, subscriptionStats.getSubscriptionProperties().get("foo")); + Map<String, String> props = admin.topics().getSubscriptionProperties(topic, subscriptionName); + assertEquals(value, props.get("foo")); + SubscriptionStats subscriptionStats2 = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName2); assertTrue(subscriptionStats2.getSubscriptionProperties().isEmpty()); + + Map<String, String> props2 = admin.topics().getSubscriptionProperties(topic, subscriptionName2); + assertTrue(props2.isEmpty()); } } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 73f9a199a1b..2fa14e54434 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -1764,6 +1764,15 @@ public interface Topics { void updateSubscriptionProperties(String topic, String subName, Map<String, String> subscriptionProperties) throws PulsarAdminException; + /** + * Get Subscription Properties on a topic subscription. + * @param topic + * @param subName + * @throws PulsarAdminException + */ + Map<String, String> getSubscriptionProperties(String topic, String subName) + throws PulsarAdminException; + /** * Reset cursor position on a topic subscription. * @@ -1798,6 +1807,13 @@ public interface Topics { CompletableFuture<Void> updateSubscriptionPropertiesAsync(String topic, String subName, Map<String, String> subscriptionProperties); + /** + * Get Subscription Properties on a topic subscription. + * @param topic + * @param subName + */ + CompletableFuture<Map<String, String>> getSubscriptionPropertiesAsync(String topic, String subName); + /** * Reset cursor position on a topic subscription. * diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index dbdaffd9e5c..a2360fb5b73 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -1179,6 +1179,12 @@ public class TopicsImpl extends BaseResource implements Topics { sync(() -> updateSubscriptionPropertiesAsync(topic, subName, subscriptionProperties)); } + @Override + public Map<String, String> getSubscriptionProperties(String topic, String subName) + throws PulsarAdminException { + return sync(() -> getSubscriptionPropertiesAsync(topic, subName)); + } + @Override public CompletableFuture<Void> updateSubscriptionPropertiesAsync(String topic, String subName, Map<String, String> subscriptionProperties) { @@ -1192,6 +1198,29 @@ public class TopicsImpl extends BaseResource implements Topics { return asyncPutRequest(path, Entity.entity(subscriptionProperties, MediaType.APPLICATION_JSON)); } + @Override + public CompletableFuture<Map<String, String>> getSubscriptionPropertiesAsync(String topic, String subName) { + TopicName tn = validateTopic(topic); + String encodedSubName = Codec.encode(subName); + WebTarget path = topicPath(tn, "subscription", encodedSubName, + "properties"); + final CompletableFuture<Map<String, String>> future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback<Map<String, String>>() { + + @Override + public void completed(Map<String, String> response) { + future.complete(response); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + @Override public void resetCursor(String topic, String subName, MessageId messageId , boolean isExcluded) throws PulsarAdminException { diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 8e7e339b536..067e08b5599 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -1422,6 +1422,10 @@ public class PulsarAdminToolTest { cmdTopics.run(split("update-subscription-properties persistent://myprop/clust/ns1/ds1 -s sub1 --clear")); verify(mockTopics).updateSubscriptionProperties("persistent://myprop/clust/ns1/ds1", "sub1", new HashMap<>()); + cmdTopics = new CmdTopics(() -> admin); + cmdTopics.run(split("get-subscription-properties persistent://myprop/clust/ns1/ds1 -s sub1")); + verify(mockTopics).getSubscriptionProperties("persistent://myprop/clust/ns1/ds1", "sub1"); + cmdTopics = new CmdTopics(() -> admin); props = new HashMap<>(); props.put("a", "b"); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 8fc8d5646d7..562d7993f5c 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -74,6 +74,7 @@ import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.util.DateFormatter; +import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.common.util.RelativeTimeUtil; @Getter @@ -99,6 +100,7 @@ public class CmdTopics extends CmdBase { jcommander.addCommand("unsubscribe", new DeleteSubscription()); jcommander.addCommand("create-subscription", new CreateSubscription()); jcommander.addCommand("update-subscription-properties", new UpdateSubscriptionProperties()); + jcommander.addCommand("get-subscription-properties", new GetSubscriptionProperties()); jcommander.addCommand("stats", new GetStats()); jcommander.addCommand("stats-internal", new GetInternalStats()); @@ -968,6 +970,24 @@ public class CmdTopics extends CmdBase { } } + @Parameters(commandDescription = "Get the properties of a subscription on a topic") + private class GetSubscriptionProperties extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List<String> params; + + @Parameter(names = { "-s", + "--subscription" }, description = "Subscription to describe", required = true) + private String subscriptionName; + + @Override + void run() throws Exception { + String topic = validateTopicName(params); + Map<String, String> result = getTopics().getSubscriptionProperties(topic, subscriptionName); + // Ensure we are using JSON and not Java toString() + System.out.println(ObjectMapperFactory.getThreadLocal().writeValueAsString(result)); + } + } + @Parameters(commandDescription = "Reset position for subscription to a position that is closest to " + "timestamp or messageId.") diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java index a1e417ca547..cda2347b4dc 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java @@ -207,6 +207,18 @@ public class CLITest extends PulsarTestSuite { ); resultUpdate.assertNoOutput(); + ContainerExecResult resultGet = container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "topics", + "get-subscription-properties", + "persistent://public/default/" + topic, + "--subscription", + "" + subscriptionPrefix + i + ); + assertEquals( + resultGet.getStdout().trim(), "{\"a\":\"e\"}", + "unexpected output " + resultGet.getStdout() + " - error " + resultGet.getStderr()); + ContainerExecResult resultClear = container.execCmd( PulsarCluster.ADMIN_SCRIPT, "topics", @@ -217,6 +229,20 @@ public class CLITest extends PulsarTestSuite { "" + subscriptionPrefix + i ); resultClear.assertNoOutput(); + + ContainerExecResult resultGetAfterClear = container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "topics", + "get-subscription-properties", + "persistent://public/default/" + topic, + "--subscription", + "" + subscriptionPrefix + i + ); + assertEquals( + resultGetAfterClear.getStdout().trim(), "{}", + "unexpected output " + resultGetAfterClear.getStdout() + + " - error " + resultGetAfterClear.getStderr()); + i++; } }
