This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit cbb060ec3e6486ee5a6df601f2724378e07b286c Author: Xiaoyu Hou <[email protected]> AuthorDate: Fri Jul 1 10:19:19 2022 +0800 [fix][broker]Fix getInternalStats occasional lack of LeaderInfo again (#16238) * Fix getInternalStats occasional lack of LeaderInfo again * Make futures as wildcard (cherry picked from commit a9af98050c0c50d3d9e25f1db50bf2df7584d2ba) --- .../broker/service/persistent/PersistentTopic.java | 76 +++++++++++----------- 1 file changed, 37 insertions(+), 39 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index be5d6095e01..5be91078b14 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1972,40 +1972,43 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal stats.state = ml.getState().toString(); stats.ledgers = Lists.newArrayList(); - List<CompletableFuture<String>> futures = Lists.newArrayList(); + Set<CompletableFuture<?>> futures = Sets.newConcurrentHashSet(); CompletableFuture<Set<String>> availableBookiesFuture = brokerService.pulsar().getPulsarResources().getBookieResources().listAvailableBookiesAsync(); - futures.add(availableBookiesFuture.handle((strings, throwable) -> null)); - availableBookiesFuture.whenComplete((bookies, e) -> { - if (e != null) { - log.error("[{}] Failed to fetch available bookies.", topic, e); - statFuture.completeExceptionally(e); - } else { - ml.getLedgersInfo().forEach((id, li) -> { - LedgerInfo info = new LedgerInfo(); - info.ledgerId = li.getLedgerId(); - info.entries = li.getEntries(); - info.size = li.getSize(); - info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete(); - stats.ledgers.add(info); - if (includeLedgerMetadata) { - futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> { - if (ex == null) { - info.metadata = lMetadata; - } - return null; - })); - futures.add(ml.getEnsemblesAsync(li.getLedgerId()).handle((ensembles, ex) -> { - if (ex == null) { - info.underReplicated = !bookies.containsAll(ensembles.stream().map(BookieId::toString) - .collect(Collectors.toList())); + futures.add( + availableBookiesFuture + .whenComplete((bookies, e) -> { + if (e != null) { + log.error("[{}] Failed to fetch available bookies.", topic, e); + statFuture.completeExceptionally(e); + } else { + ml.getLedgersInfo().forEach((id, li) -> { + LedgerInfo info = new LedgerInfo(); + info.ledgerId = li.getLedgerId(); + info.entries = li.getEntries(); + info.size = li.getSize(); + info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete(); + stats.ledgers.add(info); + if (includeLedgerMetadata) { + futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> { + if (ex == null) { + info.metadata = lMetadata; + } + return null; + })); + futures.add(ml.getEnsemblesAsync(li.getLedgerId()).handle((ensembles, ex) -> { + if (ex == null) { + info.underReplicated = + !bookies.containsAll(ensembles.stream().map(BookieId::toString) + .collect(Collectors.toList())); + } + return null; + })); } - return null; - })); + }); } - }); - } - }); + }) + ); // Add ledger info for compacted topic ledger if exist. LedgerInfo info = new LedgerInfo(); @@ -2121,16 +2124,11 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } else { schemaStoreLedgersFuture.complete(null); } - schemaStoreLedgersFuture.thenRun(() -> { - if (futures != null) { - FutureUtil.waitForAll(futures).handle((res, ex) -> { - statFuture.complete(stats); - return null; - }); - } else { + schemaStoreLedgersFuture.thenRun(() -> + FutureUtil.waitForAll(futures).handle((res, ex) -> { statFuture.complete(stats); - } - }).exceptionally(e -> { + return null; + })).exceptionally(e -> { statFuture.completeExceptionally(e); return null; });
