This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fe19b3ca949 [ML] Fix race condition in getManagedLedgerInternalStats
when includeLedgerMetadata=true (#15918)
fe19b3ca949 is described below
commit fe19b3ca949009c952270a64ba94dbb329ea572f
Author: Lari Hotari <[email protected]>
AuthorDate: Sat Jun 4 10:36:00 2022 +0300
[ML] Fix race condition in getManagedLedgerInternalStats when
includeLedgerMetadata=true (#15918)
- getLedgerMetadata is an asynchronous operation and the final result
shouldn't complete before the
metadata for all ledgers has been retrieved
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 86 +++++++++++++---------
1 file changed, 51 insertions(+), 35 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index f228c32a90e..56399f74fd6 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -37,6 +37,7 @@ import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.time.Clock;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -4045,45 +4046,60 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
stats.lastConfirmedEntry = this.getLastConfirmedEntry().toString();
stats.state = this.getState().toString();
- stats.ledgers = Lists.newArrayList();
- this.getLedgersInfo().forEach((id, li) -> {
- ManagedLedgerInternalStats.LedgerInfo info = new
ManagedLedgerInternalStats.LedgerInfo();
- info.ledgerId = li.getLedgerId();
- info.entries = li.getEntries();
- info.size = li.getSize();
- info.offloaded = li.hasOffloadContext() &&
li.getOffloadContext().getComplete();
- stats.ledgers.add(info);
- if (includeLedgerMetadata) {
- this.getLedgerMetadata(li.getLedgerId()).handle((lMetadata,
ex) -> {
- if (ex == null) {
- info.metadata = lMetadata;
- }
+ stats.cursors = Maps.newTreeMap();
+ this.getCursors().forEach(c -> {
+ ManagedCursorImpl cursor = (ManagedCursorImpl) c;
+ PersistentTopicInternalStats.CursorStats cs = new
PersistentTopicInternalStats.CursorStats();
+ cs.markDeletePosition = cursor.getMarkDeletedPosition().toString();
+ cs.readPosition = cursor.getReadPosition().toString();
+ cs.waitingReadOp = cursor.hasPendingReadRequest();
+ cs.pendingReadOps = cursor.getPendingReadOpsCount();
+ cs.messagesConsumedCounter = cursor.getMessagesConsumedCounter();
+ cs.cursorLedger = cursor.getCursorLedger();
+ cs.cursorLedgerLastEntry = cursor.getCursorLedgerLastEntry();
+ cs.individuallyDeletedMessages =
cursor.getIndividuallyDeletedMessages();
+ cs.lastLedgerSwitchTimestamp =
DateFormatter.format(cursor.getLastLedgerSwitchTimestamp());
+ cs.state = cursor.getState();
+ cs.numberOfEntriesSinceFirstNotAckedMessage =
cursor.getNumberOfEntriesSinceFirstNotAckedMessage();
+ cs.totalNonContiguousDeletedMessagesRange =
cursor.getTotalNonContiguousDeletedMessagesRange();
+ cs.properties = cursor.getProperties();
+ stats.cursors.put(cursor.getName(), cs);
+ });
+
+ // make a snapshot of the ledgers infos since we are iterating it
twice when metadata is included
+ // a list is sufficient since there's no need to lookup by the ledger
id
+ List<LedgerInfo> ledgersInfos = new
ArrayList<>(this.getLedgersInfo().values());
+
+ // add asynchronous metadata retrieval operations to a hashmap
+ Map<Long, CompletableFuture<String>> ledgerMetadataFutures = new
HashMap();
+ if (includeLedgerMetadata) {
+ ledgersInfos.forEach(li -> {
+ long ledgerId = li.getLedgerId();
+ ledgerMetadataFutures.put(ledgerId,
this.getLedgerMetadata(ledgerId).exceptionally(throwable -> {
+ log.warn("Getting metadata for ledger {} failed.",
ledgerId, throwable);
return null;
- });
- }
+ }));
+ });
+ }
- stats.cursors = Maps.newTreeMap();
- this.getCursors().forEach(c -> {
- ManagedCursorImpl cursor = (ManagedCursorImpl) c;
- PersistentTopicInternalStats.CursorStats cs = new
PersistentTopicInternalStats.CursorStats();
- cs.markDeletePosition =
cursor.getMarkDeletedPosition().toString();
- cs.readPosition = cursor.getReadPosition().toString();
- cs.waitingReadOp = cursor.hasPendingReadRequest();
- cs.pendingReadOps = cursor.getPendingReadOpsCount();
- cs.messagesConsumedCounter =
cursor.getMessagesConsumedCounter();
- cs.cursorLedger = cursor.getCursorLedger();
- cs.cursorLedgerLastEntry = cursor.getCursorLedgerLastEntry();
- cs.individuallyDeletedMessages =
cursor.getIndividuallyDeletedMessages();
- cs.lastLedgerSwitchTimestamp =
DateFormatter.format(cursor.getLastLedgerSwitchTimestamp());
- cs.state = cursor.getState();
- cs.numberOfEntriesSinceFirstNotAckedMessage =
- cursor.getNumberOfEntriesSinceFirstNotAckedMessage();
- cs.totalNonContiguousDeletedMessagesRange =
cursor.getTotalNonContiguousDeletedMessagesRange();
- cs.properties = cursor.getProperties();
- stats.cursors.put(cursor.getName(), cs);
+ // wait until metadata has been retrieved
+ FutureUtil.waitForAll(ledgerMetadataFutures.values()).thenAccept(__ ->
{
+ stats.ledgers = Lists.newArrayList();
+ ledgersInfos.forEach(li -> {
+ ManagedLedgerInternalStats.LedgerInfo info = new
ManagedLedgerInternalStats.LedgerInfo();
+ info.ledgerId = li.getLedgerId();
+ info.entries = li.getEntries();
+ info.size = li.getSize();
+ info.offloaded = li.hasOffloadContext() &&
li.getOffloadContext().getComplete();
+ if (includeLedgerMetadata) {
+ // lookup metadata from the hashmap which contains
completed async operations
+ info.metadata =
ledgerMetadataFutures.get(li.getLedgerId()).getNow(null);
+ }
+ stats.ledgers.add(info);
});
+ statFuture.complete(stats);
});
- statFuture.complete(stats);
+
return statFuture;
}