This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 688bfa397741dcfb39eee21097c83774b4a913ec Author: Xiaoyu Hou <[email protected]> AuthorDate: Fri Jul 1 10:19:19 2022 +0800 [fix][broker]Fix getInternalStats occasional lack of LeaderInfo again (#16238) * Fix getInternalStats occasional lack of LeaderInfo again * Make futures as wildcard (cherry picked from commit a9af98050c0c50d3d9e25f1db50bf2df7584d2ba) --- .../broker/service/persistent/PersistentTopic.java | 76 +++++++++++----------- 1 file changed, 37 insertions(+), 39 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index a5cffa22939..19706585531 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2000,39 +2000,42 @@ public class PersistentTopic extends AbstractTopic stats.state = ml.getState().toString(); stats.ledgers = Lists.newArrayList(); - List<CompletableFuture<String>> futures = Lists.newArrayList(); + Set<CompletableFuture<?>> futures = Sets.newConcurrentHashSet(); CompletableFuture<Set<String>> availableBookiesFuture = brokerService.pulsar().getAvailableBookiesAsync(); - futures.add(availableBookiesFuture.handle((strings, throwable) -> null)); - availableBookiesFuture.whenComplete((bookies, e) -> { - if (e != null) { - log.error("[{}] Failed to fetch available bookies.", topic, e); - statFuture.completeExceptionally(e); - } else { - ml.getLedgersInfo().forEach((id, li) -> { - LedgerInfo info = new LedgerInfo(); - info.ledgerId = li.getLedgerId(); - info.entries = li.getEntries(); - info.size = li.getSize(); - info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete(); - stats.ledgers.add(info); - if (includeLedgerMetadata) { - futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> { - if (ex == null) { - info.metadata = lMetadata; - } - return null; - })); - futures.add(ml.getEnsemblesAsync(li.getLedgerId()).handle((ensembles, ex) -> { - if (ex == null) { - info.underReplicated = !bookies.containsAll(ensembles.stream().map(BookieId::toString) - .collect(Collectors.toList())); + futures.add( + availableBookiesFuture + .whenComplete((bookies, e) -> { + if (e != null) { + log.error("[{}] Failed to fetch available bookies.", topic, e); + statFuture.completeExceptionally(e); + } else { + ml.getLedgersInfo().forEach((id, li) -> { + LedgerInfo info = new LedgerInfo(); + info.ledgerId = li.getLedgerId(); + info.entries = li.getEntries(); + info.size = li.getSize(); + info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete(); + stats.ledgers.add(info); + if (includeLedgerMetadata) { + futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> { + if (ex == null) { + info.metadata = lMetadata; + } + return null; + })); + futures.add(ml.getEnsemblesAsync(li.getLedgerId()).handle((ensembles, ex) -> { + if (ex == null) { + info.underReplicated = + !bookies.containsAll(ensembles.stream().map(BookieId::toString) + .collect(Collectors.toList())); + } + return null; + })); } - return null; - })); + }); } - }); - } - }); + }) + ); // Add ledger info for compacted topic ledger if exist. LedgerInfo info = new LedgerInfo(); @@ -2148,16 +2151,11 @@ public class PersistentTopic extends AbstractTopic } else { schemaStoreLedgersFuture.complete(null); } - schemaStoreLedgersFuture.thenRun(() -> { - if (futures != null) { - FutureUtil.waitForAll(futures).handle((res, ex) -> { - statFuture.complete(stats); - return null; - }); - } else { + schemaStoreLedgersFuture.thenRun(() -> + FutureUtil.waitForAll(new ArrayList<>(futures)).handle((res, ex) -> { statFuture.complete(stats); - } - }).exceptionally(e -> { + return null; + })).exceptionally(e -> { statFuture.completeExceptionally(e); return null; });
