This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fb8a0e87b97ad5cc2c183b5d895f7c68bbf467b1 Author: Ruimin MA <[email protected]> AuthorDate: Tue Nov 11 14:28:58 2025 +0800 [fix][test] Fix flaky KeySharedSubscriptionBrokerCacheTest.testReplayQueueReadsGettingCached (#24955) (cherry picked from commit aeb1bd1bf9a178d5506522d43c17d98b88f87796) --- .../bookkeeper/client/PulsarMockBookKeeper.java | 3 ++- .../bookkeeper/client/PulsarMockLedgerHandle.java | 19 ++++++++++--------- .../bookkeeper/client/PulsarMockReadHandle.java | 13 ++++++------- 3 files changed, 18 insertions(+), 17 deletions(-) diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java index 59fb7ff5a62..9a195159377 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java @@ -260,7 +260,7 @@ public class PulsarMockBookKeeper extends BookKeeper { } else { return FutureUtils.value(new PulsarMockReadHandle(PulsarMockBookKeeper.this, ledgerId, lh.getLedgerMetadata(), lh.entries, - PulsarMockBookKeeper.this::getReadHandleInterceptor)); + PulsarMockBookKeeper.this::getReadHandleInterceptor, lh.totalLengthCounter)); } }); } @@ -303,6 +303,7 @@ public class PulsarMockBookKeeper extends BookKeeper { } for (PulsarMockLedgerHandle ledger : ledgers.values()) { ledger.entries.clear(); + ledger.totalLengthCounter.set(0); } scheduler.shutdown(); ledgers.clear(); diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java index 3ed591ba135..74dab3ebbfe 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java @@ -19,17 +19,18 @@ package org.apache.bookkeeper.client; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.security.GeneralSecurityException; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Enumeration; import java.util.List; import java.util.Queue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; import lombok.Getter; import org.apache.bookkeeper.client.AsyncCallback.AddCallback; import org.apache.bookkeeper.client.AsyncCallback.CloseCallback; @@ -53,7 +54,7 @@ import org.slf4j.LoggerFactory; */ public class PulsarMockLedgerHandle extends LedgerHandle { - final ArrayList<LedgerEntryImpl> entries = Lists.newArrayList(); + final List<LedgerEntryImpl> entries = Collections.synchronizedList(new ArrayList<>()); final PulsarMockBookKeeper bk; final long id; final DigestType digest; @@ -63,6 +64,8 @@ public class PulsarMockLedgerHandle extends LedgerHandle { @VisibleForTesting @Getter boolean fenced = false; + // Count for total length of the entries + final AtomicLong totalLengthCounter = new AtomicLong(0); public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id, DigestType digest, byte[] passwd) throws GeneralSecurityException { @@ -73,7 +76,8 @@ public class PulsarMockLedgerHandle extends LedgerHandle { this.digest = digest; this.passwd = Arrays.copyOf(passwd, passwd.length); - readHandle = new PulsarMockReadHandle(bk, id, getLedgerMetadata(), entries, bk::getReadHandleInterceptor); + readHandle = new PulsarMockReadHandle(bk, id, getLedgerMetadata(), entries, + bk::getReadHandleInterceptor, totalLengthCounter); } @Override @@ -159,6 +163,7 @@ public class PulsarMockLedgerHandle extends LedgerHandle { } lastEntry = entries.size(); + totalLengthCounter.addAndGet(data.length); entries.add(LedgerEntryImpl.create(ledgerId, lastEntry, data.length, Unpooled.wrappedBuffer(data))); return lastEntry; } @@ -191,6 +196,7 @@ public class PulsarMockLedgerHandle extends LedgerHandle { lastEntry = entries.size(); byte[] storedData = new byte[data.readableBytes()]; data.readBytes(storedData); + totalLengthCounter.addAndGet(storedData.length); entries.add(LedgerEntryImpl.create(ledgerId, lastEntry, storedData.length, Unpooled.wrappedBuffer(storedData))); return FutureUtils.value(lastEntry); @@ -231,12 +237,7 @@ public class PulsarMockLedgerHandle extends LedgerHandle { @Override public long getLength() { - long length = 0; - for (LedgerEntryImpl entry : entries) { - length += entry.getLength(); - } - - return length; + return totalLengthCounter.get(); } diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java index 9f3f4969199..62d84dbfc40 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java @@ -21,6 +21,7 @@ package org.apache.bookkeeper.client; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; @@ -42,15 +43,18 @@ class PulsarMockReadHandle implements ReadHandle { private final LedgerMetadata metadata; private final List<LedgerEntryImpl> entries; private final Supplier<PulsarMockReadHandleInterceptor> readHandleInterceptorSupplier; + private final AtomicLong totalLengthCounter; PulsarMockReadHandle(PulsarMockBookKeeper bk, long ledgerId, LedgerMetadata metadata, List<LedgerEntryImpl> entries, - Supplier<PulsarMockReadHandleInterceptor> readHandleInterceptorSupplier) { + Supplier<PulsarMockReadHandleInterceptor> readHandleInterceptorSupplier, + AtomicLong totalLengthCounter) { this.bk = bk; this.ledgerId = ledgerId; this.metadata = metadata; this.entries = entries; this.readHandleInterceptorSupplier = readHandleInterceptorSupplier; + this.totalLengthCounter = totalLengthCounter; } @Override @@ -99,12 +103,7 @@ class PulsarMockReadHandle implements ReadHandle { @Override public long getLength() { - long length = 0; - for (LedgerEntryImpl entry : entries) { - length += entry.getLength(); - } - - return length; + return totalLengthCounter.get(); } @Override
