This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fe19b3ca949 [ML] Fix race condition in getManagedLedgerInternalStats 
when includeLedgerMetadata=true (#15918)
fe19b3ca949 is described below

commit fe19b3ca949009c952270a64ba94dbb329ea572f
Author: Lari Hotari <[email protected]>
AuthorDate: Sat Jun 4 10:36:00 2022 +0300

    [ML] Fix race condition in getManagedLedgerInternalStats when 
includeLedgerMetadata=true (#15918)
    
    - getLedgerMetadata is an asynchronous operation and the final result 
shouldn't complete before the
      metadata for all ledgers has been retrieved
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 86 +++++++++++++---------
 1 file changed, 51 insertions(+), 35 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index f228c32a90e..56399f74fd6 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -37,6 +37,7 @@ import io.netty.util.Recycler.Handle;
 import io.netty.util.ReferenceCountUtil;
 import java.io.IOException;
 import java.time.Clock;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -4045,45 +4046,60 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
         stats.lastConfirmedEntry = this.getLastConfirmedEntry().toString();
         stats.state = this.getState().toString();
 
-        stats.ledgers = Lists.newArrayList();
-        this.getLedgersInfo().forEach((id, li) -> {
-            ManagedLedgerInternalStats.LedgerInfo info = new 
ManagedLedgerInternalStats.LedgerInfo();
-            info.ledgerId = li.getLedgerId();
-            info.entries = li.getEntries();
-            info.size = li.getSize();
-            info.offloaded = li.hasOffloadContext() && 
li.getOffloadContext().getComplete();
-            stats.ledgers.add(info);
-            if (includeLedgerMetadata) {
-                this.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, 
ex) -> {
-                    if (ex == null) {
-                        info.metadata = lMetadata;
-                    }
+        stats.cursors = Maps.newTreeMap();
+        this.getCursors().forEach(c -> {
+            ManagedCursorImpl cursor = (ManagedCursorImpl) c;
+            PersistentTopicInternalStats.CursorStats cs = new 
PersistentTopicInternalStats.CursorStats();
+            cs.markDeletePosition = cursor.getMarkDeletedPosition().toString();
+            cs.readPosition = cursor.getReadPosition().toString();
+            cs.waitingReadOp = cursor.hasPendingReadRequest();
+            cs.pendingReadOps = cursor.getPendingReadOpsCount();
+            cs.messagesConsumedCounter = cursor.getMessagesConsumedCounter();
+            cs.cursorLedger = cursor.getCursorLedger();
+            cs.cursorLedgerLastEntry = cursor.getCursorLedgerLastEntry();
+            cs.individuallyDeletedMessages = 
cursor.getIndividuallyDeletedMessages();
+            cs.lastLedgerSwitchTimestamp = 
DateFormatter.format(cursor.getLastLedgerSwitchTimestamp());
+            cs.state = cursor.getState();
+            cs.numberOfEntriesSinceFirstNotAckedMessage = 
cursor.getNumberOfEntriesSinceFirstNotAckedMessage();
+            cs.totalNonContiguousDeletedMessagesRange = 
cursor.getTotalNonContiguousDeletedMessagesRange();
+            cs.properties = cursor.getProperties();
+            stats.cursors.put(cursor.getName(), cs);
+        });
+
+        // make a snapshot of the ledgers infos since we are iterating it 
twice when metadata is included
+        // a list is sufficient since there's no need to lookup by the ledger 
id
+        List<LedgerInfo> ledgersInfos = new 
ArrayList<>(this.getLedgersInfo().values());
+
+        // add asynchronous metadata retrieval operations to a hashmap
+        Map<Long, CompletableFuture<String>> ledgerMetadataFutures = new 
HashMap();
+        if (includeLedgerMetadata) {
+            ledgersInfos.forEach(li -> {
+                long ledgerId = li.getLedgerId();
+                ledgerMetadataFutures.put(ledgerId, 
this.getLedgerMetadata(ledgerId).exceptionally(throwable -> {
+                    log.warn("Getting metadata for ledger {} failed.", 
ledgerId, throwable);
                     return null;
-                });
-            }
+                }));
+            });
+        }
 
-            stats.cursors = Maps.newTreeMap();
-            this.getCursors().forEach(c -> {
-                ManagedCursorImpl cursor = (ManagedCursorImpl) c;
-                PersistentTopicInternalStats.CursorStats cs = new 
PersistentTopicInternalStats.CursorStats();
-                cs.markDeletePosition = 
cursor.getMarkDeletedPosition().toString();
-                cs.readPosition = cursor.getReadPosition().toString();
-                cs.waitingReadOp = cursor.hasPendingReadRequest();
-                cs.pendingReadOps = cursor.getPendingReadOpsCount();
-                cs.messagesConsumedCounter = 
cursor.getMessagesConsumedCounter();
-                cs.cursorLedger = cursor.getCursorLedger();
-                cs.cursorLedgerLastEntry = cursor.getCursorLedgerLastEntry();
-                cs.individuallyDeletedMessages = 
cursor.getIndividuallyDeletedMessages();
-                cs.lastLedgerSwitchTimestamp = 
DateFormatter.format(cursor.getLastLedgerSwitchTimestamp());
-                cs.state = cursor.getState();
-                cs.numberOfEntriesSinceFirstNotAckedMessage =
-                        cursor.getNumberOfEntriesSinceFirstNotAckedMessage();
-                cs.totalNonContiguousDeletedMessagesRange = 
cursor.getTotalNonContiguousDeletedMessagesRange();
-                cs.properties = cursor.getProperties();
-                stats.cursors.put(cursor.getName(), cs);
+        // wait until metadata has been retrieved
+        FutureUtil.waitForAll(ledgerMetadataFutures.values()).thenAccept(__ -> 
{
+            stats.ledgers = Lists.newArrayList();
+            ledgersInfos.forEach(li -> {
+                ManagedLedgerInternalStats.LedgerInfo info = new 
ManagedLedgerInternalStats.LedgerInfo();
+                info.ledgerId = li.getLedgerId();
+                info.entries = li.getEntries();
+                info.size = li.getSize();
+                info.offloaded = li.hasOffloadContext() && 
li.getOffloadContext().getComplete();
+                if (includeLedgerMetadata) {
+                    // lookup metadata from the hashmap which contains 
completed async operations
+                    info.metadata = 
ledgerMetadataFutures.get(li.getLedgerId()).getNow(null);
+                }
+                stats.ledgers.add(info);
             });
+            statFuture.complete(stats);
         });
-        statFuture.complete(stats);
+
         return statFuture;
     }
 

Reply via email to