This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 688bfa397741dcfb39eee21097c83774b4a913ec
Author: Xiaoyu Hou <[email protected]>
AuthorDate: Fri Jul 1 10:19:19 2022 +0800

    [fix][broker]Fix getInternalStats occasional lack of LeaderInfo again 
(#16238)
    
    * Fix getInternalStats occasional lack of LeaderInfo again
    
    * Make futures as wildcard
    
    (cherry picked from commit a9af98050c0c50d3d9e25f1db50bf2df7584d2ba)
---
 .../broker/service/persistent/PersistentTopic.java | 76 +++++++++++-----------
 1 file changed, 37 insertions(+), 39 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index a5cffa22939..19706585531 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2000,39 +2000,42 @@ public class PersistentTopic extends AbstractTopic
         stats.state = ml.getState().toString();
 
         stats.ledgers = Lists.newArrayList();
-        List<CompletableFuture<String>> futures = Lists.newArrayList();
+        Set<CompletableFuture<?>> futures = Sets.newConcurrentHashSet();
         CompletableFuture<Set<String>> availableBookiesFuture = 
brokerService.pulsar().getAvailableBookiesAsync();
-        futures.add(availableBookiesFuture.handle((strings, throwable) -> 
null));
-        availableBookiesFuture.whenComplete((bookies, e) -> {
-            if (e != null) {
-                log.error("[{}] Failed to fetch available bookies.", topic, e);
-                statFuture.completeExceptionally(e);
-            } else {
-                ml.getLedgersInfo().forEach((id, li) -> {
-                    LedgerInfo info = new LedgerInfo();
-                    info.ledgerId = li.getLedgerId();
-                    info.entries = li.getEntries();
-                    info.size = li.getSize();
-                    info.offloaded = li.hasOffloadContext() && 
li.getOffloadContext().getComplete();
-                    stats.ledgers.add(info);
-                    if (includeLedgerMetadata) {
-                        
futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
-                            if (ex == null) {
-                                info.metadata = lMetadata;
-                            }
-                            return null;
-                        }));
-                        
futures.add(ml.getEnsemblesAsync(li.getLedgerId()).handle((ensembles, ex) -> {
-                            if (ex == null) {
-                                info.underReplicated = 
!bookies.containsAll(ensembles.stream().map(BookieId::toString)
-                                        .collect(Collectors.toList()));
+        futures.add(
+            availableBookiesFuture
+                .whenComplete((bookies, e) -> {
+                    if (e != null) {
+                        log.error("[{}] Failed to fetch available bookies.", 
topic, e);
+                        statFuture.completeExceptionally(e);
+                    } else {
+                        ml.getLedgersInfo().forEach((id, li) -> {
+                            LedgerInfo info = new LedgerInfo();
+                            info.ledgerId = li.getLedgerId();
+                            info.entries = li.getEntries();
+                            info.size = li.getSize();
+                            info.offloaded = li.hasOffloadContext() && 
li.getOffloadContext().getComplete();
+                            stats.ledgers.add(info);
+                            if (includeLedgerMetadata) {
+                                
futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
+                                    if (ex == null) {
+                                        info.metadata = lMetadata;
+                                    }
+                                    return null;
+                                }));
+                                
futures.add(ml.getEnsemblesAsync(li.getLedgerId()).handle((ensembles, ex) -> {
+                                    if (ex == null) {
+                                        info.underReplicated =
+                                            
!bookies.containsAll(ensembles.stream().map(BookieId::toString)
+                                                .collect(Collectors.toList()));
+                                    }
+                                    return null;
+                                }));
                             }
-                            return null;
-                        }));
+                        });
                     }
-                });
-            }
-        });
+                })
+        );
 
         // Add ledger info for compacted topic ledger if exist.
         LedgerInfo info = new LedgerInfo();
@@ -2148,16 +2151,11 @@ public class PersistentTopic extends AbstractTopic
         } else {
             schemaStoreLedgersFuture.complete(null);
         }
-        schemaStoreLedgersFuture.thenRun(() -> {
-            if (futures != null) {
-                FutureUtil.waitForAll(futures).handle((res, ex) -> {
-                    statFuture.complete(stats);
-                    return null;
-                });
-            } else {
+        schemaStoreLedgersFuture.thenRun(() ->
+            FutureUtil.waitForAll(new ArrayList<>(futures)).handle((res, ex) 
-> {
                 statFuture.complete(stats);
-            }
-        }).exceptionally(e -> {
+                return null;
+            })).exceptionally(e -> {
             statFuture.completeExceptionally(e);
             return null;
         });

Reply via email to