This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5302c8bb160f157c2e395063e8a6b0b2538469de Author: Xiaoyu Hou <[email protected]> AuthorDate: Sun Aug 7 16:11:49 2022 +0800 [Improve][Broker]Reduce PartitionedStats local REST call (#16916) --- .../broker/admin/impl/PersistentTopicsBase.java | 35 ++++++++++++++-------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 6f48145b0ed..4b635f205e3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1342,17 +1342,29 @@ public class PersistentTopicsBase extends AdminResource { return; } PartitionedTopicStatsImpl stats = new PartitionedTopicStatsImpl(partitionMetadata); - List<CompletableFuture<TopicStats>> topicStatsFutureList = Lists.newArrayList(); + List<CompletableFuture<TopicStats>> topicStatsFutureList = new ArrayList<>(partitionMetadata.partitions); for (int i = 0; i < partitionMetadata.partitions; i++) { - try { - topicStatsFutureList - .add(pulsar().getAdminClient().topics().getStatsAsync( - (topicName.getPartition(i).toString()), getPreciseBacklog, subscriptionBacklogSize, - getEarliestTimeInBacklog)); - } catch (PulsarServerException e) { - asyncResponse.resume(new RestException(e)); - return; - } + TopicName partition = topicName.getPartition(i); + topicStatsFutureList.add( + pulsar().getNamespaceService() + .isServiceUnitOwnedAsync(partition) + .thenCompose(owned -> { + if (owned) { + return getTopicReferenceAsync(partition) + .thenApply(ref -> + ref.getStats(getPreciseBacklog, subscriptionBacklogSize, + getEarliestTimeInBacklog)); + } else { + try { + return pulsar().getAdminClient().topics().getStatsAsync( + partition.toString(), getPreciseBacklog, subscriptionBacklogSize, + getEarliestTimeInBacklog); + } catch (PulsarServerException e) { + return FutureUtil.failedFuture(e); + } + } + }) + ); } FutureUtil.waitForAll(topicStatsFutureList).handle((result, exception) -> { @@ -1363,8 +1375,7 @@ public class PersistentTopicsBase extends AdminResource { try { stats.add(statFuture.get()); if (perPartition) { - stats.getPartitions().put(topicName.getPartition(i).toString(), - (TopicStatsImpl) statFuture.get()); + stats.getPartitions().put(topicName.getPartition(i).toString(), statFuture.get()); } } catch (Exception e) { asyncResponse.resume(new RestException(e));
