This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 5aebfedeca8 [fix][ml] PIP-430: Fix concurrency issue in
MessageMetadata caching and improve caching (#24836)
5aebfedeca8 is described below
commit 5aebfedeca869c8407dd9706a6ec34b57372a294
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Oct 10 17:33:15 2025 +0300
[fix][ml] PIP-430: Fix concurrency issue in MessageMetadata caching and
improve caching (#24836)
(cherry picked from commit 9737d038485e0c15f251dc334d6963fd0207953e)
---
.../main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java | 8 +++++---
.../bookkeeper/mledger/impl/cache/RangeCacheEntryWrapper.java | 8 +++++---
.../apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java | 3 ++-
.../bookkeeper/mledger/impl/ManagedCursorConcurrencyTest.java | 4 ++--
4 files changed, 14 insertions(+), 9 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
index 070a0fc1bea..8a48dbb7ea1 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
@@ -150,13 +150,15 @@ public final class EntryImpl extends
AbstractCASReferenceCounted
}
public static EntryImpl createWithRetainedDuplicate(Position position,
ByteBuf data,
- EntryReadCountHandler
entryReadCountHandler) {
+ EntryReadCountHandler
entryReadCountHandler,
+ MessageMetadata
messageMetadata) {
EntryImpl entry = RECYCLER.get();
entry.position = PositionFactory.create(position);
entry.ledgerId = position.getLedgerId();
entry.entryId = position.getEntryId();
entry.data = data.retainedDuplicate();
entry.readCountHandler = entryReadCountHandler;
+ entry.messageMetadata = messageMetadata;
entry.setRefCnt(1);
return entry;
}
@@ -305,11 +307,11 @@ public final class EntryImpl extends
AbstractCASReferenceCounted
decreaseReadCountOnRelease = enabled;
}
- public void initializeMessageMetadataIfNeeded(String managedLedgerName) {
+ public synchronized void initializeMessageMetadataIfNeeded(String
managedLedgerName) {
if (messageMetadata == null) {
try {
MessageMetadata msgMetadata = new MessageMetadata();
- Commands.peekMessageMetadata(data, msgMetadata);
+ Commands.parseMessageMetadata(data.duplicate(), msgMetadata);
this.messageMetadata = msgMetadata;
} catch (Throwable t) {
log.warn("[{}] Failed to parse message metadata for entry
{}:{}", managedLedgerName, ledgerId, entryId,
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheEntryWrapper.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheEntryWrapper.java
index 5c82207e1c7..d3dd9de038b 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheEntryWrapper.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheEntryWrapper.java
@@ -48,6 +48,7 @@ class RangeCacheEntryWrapper {
long size;
long timestampNanos;
int requeueCount;
+ boolean messageMetadataInitialized;
volatile boolean accessed;
private RangeCacheEntryWrapper(Recycler.Handle<RangeCacheEntryWrapper>
recyclerHandle) {
@@ -114,12 +115,12 @@ class RangeCacheEntryWrapper {
long stamp = lock.tryOptimisticRead();
Position localKey = this.key;
ReferenceCountedEntry localValue = this.value;
- boolean messageMetadataInitialized = localValue != null &&
localValue.getMessageMetadata() != null;
+ boolean messageMetadataInitialized = this.messageMetadataInitialized;
if (!lock.validate(stamp)) {
stamp = lock.readLock();
localKey = this.key;
localValue = this.value;
- messageMetadataInitialized = localValue != null &&
localValue.getMessageMetadata() != null;
+ messageMetadataInitialized = this.messageMetadataInitialized;
lock.unlockRead(stamp);
}
// check that the given key matches the key associated with the value
in the entry
@@ -136,8 +137,9 @@ class RangeCacheEntryWrapper {
if (wrapper.key != key && (requireSameKeyInstance ||
wrapper.key == null || !wrapper.key.equals(key))) {
return null;
}
- if (wrapper.value instanceof EntryImpl entry) {
+ if (wrapper.value instanceof EntryImpl entry &&
!this.messageMetadataInitialized) {
entry.initializeMessageMetadataIfNeeded(managedLedgerName);
+ this.messageMetadataInitialized = true;
}
return wrapper.value;
});
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
index fd391ba2bf6..f8ee2c431b1 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
@@ -152,7 +152,8 @@ public class RangeEntryCacheImpl implements EntryCache {
Position position = entry.getPosition();
ReferenceCountedEntry cacheEntry =
- EntryImpl.createWithRetainedDuplicate(position, cachedData,
entry.getReadCountHandler());
+ EntryImpl.createWithRetainedDuplicate(position, cachedData,
entry.getReadCountHandler(),
+ entry.getMessageMetadata());
cachedData.release();
if (entries.put(position, cacheEntry, entryLength)) {
totalAddedEntriesSize.add(entryLength);
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorConcurrencyTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorConcurrencyTest.java
index f7b6e755bce..285cd251a9f 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorConcurrencyTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorConcurrencyTest.java
@@ -309,10 +309,10 @@ public class ManagedCursorConcurrencyTest extends
MockedBookKeeperTestCase {
assertEquals(cursor.getMarkDeletedPosition(),
addedEntries.get(addedEntries.size() - 1));
}
- @Test(timeOut = 30000)
+ @Test(timeOut = 30000, invocationCount = 10)
public void testConcurrentReadOfSameEntry() throws Exception {
ManagedLedger ledger = factory.open("testConcurrentReadOfSameEntry",
new ManagedLedgerConfig());
- final int numCursors = 5;
+ final int numCursors = 20;
final List<ManagedCursor> cursors = new ArrayList();
for (int i = 0; i < numCursors; i++) {
final ManagedCursor cursor = ledger.openCursor("c" + i);