This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ec0215c1cf613fc71899d6156920b0f7c1e00528 Author: Lari Hotari <[email protected]> AuthorDate: Sat Jun 4 10:36:00 2022 +0300 [ML] Fix race condition in getManagedLedgerInternalStats when includeLedgerMetadata=true (#15918) - getLedgerMetadata is an asynchronous operation and the final result shouldn't complete before the metadata for all ledgers has been retrieved (cherry picked from commit fe19b3ca949009c952270a64ba94dbb329ea572f) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 86 +++++++++++++--------- 1 file changed, 51 insertions(+), 35 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 1c7297d880a..87f97ca8329 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -37,6 +37,7 @@ import io.netty.util.Recycler.Handle; import io.netty.util.ReferenceCountUtil; import java.io.IOException; import java.time.Clock; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -4029,45 +4030,60 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { stats.lastConfirmedEntry = this.getLastConfirmedEntry().toString(); stats.state = this.getState().toString(); - stats.ledgers = Lists.newArrayList(); - this.getLedgersInfo().forEach((id, li) -> { - ManagedLedgerInternalStats.LedgerInfo info = new ManagedLedgerInternalStats.LedgerInfo(); - info.ledgerId = li.getLedgerId(); - info.entries = li.getEntries(); - info.size = li.getSize(); - info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete(); - stats.ledgers.add(info); - if (includeLedgerMetadata) { - this.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> { - if (ex == null) { - info.metadata = lMetadata; - } + stats.cursors = Maps.newTreeMap(); + this.getCursors().forEach(c -> { + ManagedCursorImpl cursor = (ManagedCursorImpl) c; + PersistentTopicInternalStats.CursorStats cs = new PersistentTopicInternalStats.CursorStats(); + cs.markDeletePosition = cursor.getMarkDeletedPosition().toString(); + cs.readPosition = cursor.getReadPosition().toString(); + cs.waitingReadOp = cursor.hasPendingReadRequest(); + cs.pendingReadOps = cursor.getPendingReadOpsCount(); + cs.messagesConsumedCounter = cursor.getMessagesConsumedCounter(); + cs.cursorLedger = cursor.getCursorLedger(); + cs.cursorLedgerLastEntry = cursor.getCursorLedgerLastEntry(); + cs.individuallyDeletedMessages = cursor.getIndividuallyDeletedMessages(); + cs.lastLedgerSwitchTimestamp = DateFormatter.format(cursor.getLastLedgerSwitchTimestamp()); + cs.state = cursor.getState(); + cs.numberOfEntriesSinceFirstNotAckedMessage = cursor.getNumberOfEntriesSinceFirstNotAckedMessage(); + cs.totalNonContiguousDeletedMessagesRange = cursor.getTotalNonContiguousDeletedMessagesRange(); + cs.properties = cursor.getProperties(); + stats.cursors.put(cursor.getName(), cs); + }); + + // make a snapshot of the ledgers infos since we are iterating it twice when metadata is included + // a list is sufficient since there's no need to lookup by the ledger id + List<LedgerInfo> ledgersInfos = new ArrayList<>(this.getLedgersInfo().values()); + + // add asynchronous metadata retrieval operations to a hashmap + Map<Long, CompletableFuture<String>> ledgerMetadataFutures = new HashMap(); + if (includeLedgerMetadata) { + ledgersInfos.forEach(li -> { + long ledgerId = li.getLedgerId(); + ledgerMetadataFutures.put(ledgerId, this.getLedgerMetadata(ledgerId).exceptionally(throwable -> { + log.warn("Getting metadata for ledger {} failed.", ledgerId, throwable); return null; - }); - } + })); + }); + } - stats.cursors = Maps.newTreeMap(); - this.getCursors().forEach(c -> { - ManagedCursorImpl cursor = (ManagedCursorImpl) c; - PersistentTopicInternalStats.CursorStats cs = new PersistentTopicInternalStats.CursorStats(); - cs.markDeletePosition = cursor.getMarkDeletedPosition().toString(); - cs.readPosition = cursor.getReadPosition().toString(); - cs.waitingReadOp = cursor.hasPendingReadRequest(); - cs.pendingReadOps = cursor.getPendingReadOpsCount(); - cs.messagesConsumedCounter = cursor.getMessagesConsumedCounter(); - cs.cursorLedger = cursor.getCursorLedger(); - cs.cursorLedgerLastEntry = cursor.getCursorLedgerLastEntry(); - cs.individuallyDeletedMessages = cursor.getIndividuallyDeletedMessages(); - cs.lastLedgerSwitchTimestamp = DateFormatter.format(cursor.getLastLedgerSwitchTimestamp()); - cs.state = cursor.getState(); - cs.numberOfEntriesSinceFirstNotAckedMessage = - cursor.getNumberOfEntriesSinceFirstNotAckedMessage(); - cs.totalNonContiguousDeletedMessagesRange = cursor.getTotalNonContiguousDeletedMessagesRange(); - cs.properties = cursor.getProperties(); - stats.cursors.put(cursor.getName(), cs); + // wait until metadata has been retrieved + FutureUtil.waitForAll(ledgerMetadataFutures.values()).thenAccept(__ -> { + stats.ledgers = Lists.newArrayList(); + ledgersInfos.forEach(li -> { + ManagedLedgerInternalStats.LedgerInfo info = new ManagedLedgerInternalStats.LedgerInfo(); + info.ledgerId = li.getLedgerId(); + info.entries = li.getEntries(); + info.size = li.getSize(); + info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete(); + if (includeLedgerMetadata) { + // lookup metadata from the hashmap which contains completed async operations + info.metadata = ledgerMetadataFutures.get(li.getLedgerId()).getNow(null); + } + stats.ledgers.add(info); }); + statFuture.complete(stats); }); - statFuture.complete(stats); + return statFuture; }
