codelipenghui commented on a change in pull request #11355:
URL: https://github.com/apache/pulsar/pull/11355#discussion_r672253045
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1030,34 +1032,52 @@ protected void internalGetSubscriptions(AsyncResponse
asyncResponse, boolean aut
false).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
try {
- // get the subscriptions only from the 1st partition
- // since all the other partitions will have the same
- // subscriptions
-
pulsar().getAdminClient().topics().getSubscriptionsAsync(topicName.getPartition(0).toString())
- .whenComplete((r, ex) -> {
- if (ex != null) {
- log.warn("[{}] Failed to get list of
subscriptions for {}: {}", clientAppId(),
- topicName, ex.getMessage());
-
- if (ex instanceof
PulsarAdminException) {
- PulsarAdminException pae =
(PulsarAdminException) ex;
- if (pae.getStatusCode() ==
Status.NOT_FOUND.getStatusCode()) {
- asyncResponse.resume(new
RestException(Status.NOT_FOUND,
- "Internal topics have
not been generated yet"));
- return;
- } else {
- asyncResponse.resume(new
RestException(pae));
- return;
- }
- } else {
- asyncResponse.resume(new
RestException(ex));
- return;
- }
+ final Set<String> subscriptions =
Sets.newConcurrentHashSet();
+ final List<CompletableFuture<Object>>
subscriptionFutures = Lists.newArrayList();
+ if (topicName.getDomain() == TopicDomain.persistent) {
+ String path =
String.format("/managed-ledgers/%s/%s", namespaceName.toString(), domain());
+ List<String> children =
getLocalPolicies().getChildren(path);
Review comment:
This will lead to the broker load all the managed ledger znode under a
namespace. My suggestion is we can just generate the partition name because we
already know how many partitions the partitioned topic have.
Of cause, the partitions might not be created yet. so can just handle the
topic not exist exception when getting the subscriptions of the partition.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1030,34 +1032,52 @@ protected void internalGetSubscriptions(AsyncResponse
asyncResponse, boolean aut
false).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
try {
- // get the subscriptions only from the 1st partition
- // since all the other partitions will have the same
- // subscriptions
-
pulsar().getAdminClient().topics().getSubscriptionsAsync(topicName.getPartition(0).toString())
- .whenComplete((r, ex) -> {
- if (ex != null) {
- log.warn("[{}] Failed to get list of
subscriptions for {}: {}", clientAppId(),
- topicName, ex.getMessage());
-
- if (ex instanceof
PulsarAdminException) {
- PulsarAdminException pae =
(PulsarAdminException) ex;
- if (pae.getStatusCode() ==
Status.NOT_FOUND.getStatusCode()) {
- asyncResponse.resume(new
RestException(Status.NOT_FOUND,
- "Internal topics have
not been generated yet"));
- return;
- } else {
- asyncResponse.resume(new
RestException(pae));
- return;
- }
- } else {
- asyncResponse.resume(new
RestException(ex));
- return;
- }
+ final Set<String> subscriptions =
Sets.newConcurrentHashSet();
+ final List<CompletableFuture<Object>>
subscriptionFutures = Lists.newArrayList();
+ if (topicName.getDomain() == TopicDomain.persistent) {
+ String path =
String.format("/managed-ledgers/%s/%s", namespaceName.toString(), domain());
+ List<String> children =
getLocalPolicies().getChildren(path);
Review comment:
> I have a small suggestion, it is use getTopicReference to get
subscribers directly Instead of use adminClient, this can reduce RPC calls.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1030,34 +1032,52 @@ protected void internalGetSubscriptions(AsyncResponse
asyncResponse, boolean aut
false).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
try {
- // get the subscriptions only from the 1st partition
- // since all the other partitions will have the same
- // subscriptions
-
pulsar().getAdminClient().topics().getSubscriptionsAsync(topicName.getPartition(0).toString())
- .whenComplete((r, ex) -> {
- if (ex != null) {
- log.warn("[{}] Failed to get list of
subscriptions for {}: {}", clientAppId(),
- topicName, ex.getMessage());
-
- if (ex instanceof
PulsarAdminException) {
- PulsarAdminException pae =
(PulsarAdminException) ex;
- if (pae.getStatusCode() ==
Status.NOT_FOUND.getStatusCode()) {
- asyncResponse.resume(new
RestException(Status.NOT_FOUND,
- "Internal topics have
not been generated yet"));
- return;
- } else {
- asyncResponse.resume(new
RestException(pae));
- return;
- }
- } else {
- asyncResponse.resume(new
RestException(ex));
- return;
- }
+ final Set<String> subscriptions =
Sets.newConcurrentHashSet();
+ final List<CompletableFuture<Object>>
subscriptionFutures = Lists.newArrayList();
+ if (topicName.getDomain() == TopicDomain.persistent) {
+ String path =
String.format("/managed-ledgers/%s/%s", namespaceName.toString(), domain());
+ List<String> children =
getLocalPolicies().getChildren(path);
Review comment:
> I have a small suggestion, it is use getTopicReference to get
subscribers directly Instead of use adminClient, this can reduce RPC calls.
Not all partitions owned by the requested broker.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1030,34 +1030,55 @@ protected void internalGetSubscriptions(AsyncResponse
asyncResponse, boolean aut
false).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
try {
- // get the subscriptions only from the 1st partition
- // since all the other partitions will have the same
- // subscriptions
-
pulsar().getAdminClient().topics().getSubscriptionsAsync(topicName.getPartition(0).toString())
- .whenComplete((r, ex) -> {
- if (ex != null) {
- log.warn("[{}] Failed to get list of
subscriptions for {}: {}", clientAppId(),
- topicName, ex.getMessage());
-
- if (ex instanceof
PulsarAdminException) {
- PulsarAdminException pae =
(PulsarAdminException) ex;
- if (pae.getStatusCode() ==
Status.NOT_FOUND.getStatusCode()) {
- asyncResponse.resume(new
RestException(Status.NOT_FOUND,
- "Internal topics have
not been generated yet"));
- return;
- } else {
- asyncResponse.resume(new
RestException(pae));
- return;
- }
- } else {
- asyncResponse.resume(new
RestException(ex));
- return;
- }
+ final Set<String> subscriptions =
Sets.newConcurrentHashSet();
+ final List<CompletableFuture<Object>>
subscriptionFutures = Lists.newArrayList();
+ if (topicName.getDomain() == TopicDomain.persistent) {
+ List<String> activeTopics = Lists.newArrayList();
+ for (int i = 0; i < partitionMetadata.partitions;
i++) {
+ String path =
String.format("/managed-ledgers/%s/%s/%s", namespaceName.toString(),
+ domain(),
topicName.getPartition(i).getEncodedLocalName());
+ boolean exists =
getLocalPolicies().exists(path);
Review comment:
`exists()` is not an async method. We should avoid run a sync method in
an async callback.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]