This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new aeb1bd1bf9a [fix][test] Fix flaky
KeySharedSubscriptionBrokerCacheTest.testReplayQueueReadsGettingCached (#24955)
aeb1bd1bf9a is described below
commit aeb1bd1bf9a178d5506522d43c17d98b88f87796
Author: Ruimin MA <[email protected]>
AuthorDate: Tue Nov 11 14:28:58 2025 +0800
[fix][test] Fix flaky
KeySharedSubscriptionBrokerCacheTest.testReplayQueueReadsGettingCached (#24955)
---
.../bookkeeper/client/PulsarMockBookKeeper.java | 3 ++-
.../bookkeeper/client/PulsarMockLedgerHandle.java | 19 ++++++++++---------
.../bookkeeper/client/PulsarMockReadHandle.java | 13 ++++++-------
3 files changed, 18 insertions(+), 17 deletions(-)
diff --git
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
index 59fb7ff5a62..9a195159377 100644
---
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
+++
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
@@ -260,7 +260,7 @@ public class PulsarMockBookKeeper extends BookKeeper {
} else {
return FutureUtils.value(new
PulsarMockReadHandle(PulsarMockBookKeeper.this, ledgerId,
lh.getLedgerMetadata(), lh.entries,
-
PulsarMockBookKeeper.this::getReadHandleInterceptor));
+
PulsarMockBookKeeper.this::getReadHandleInterceptor, lh.totalLengthCounter));
}
});
}
@@ -303,6 +303,7 @@ public class PulsarMockBookKeeper extends BookKeeper {
}
for (PulsarMockLedgerHandle ledger : ledgers.values()) {
ledger.entries.clear();
+ ledger.totalLengthCounter.set(0);
}
scheduler.shutdown();
ledgers.clear();
diff --git
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
index 3ed591ba135..74dab3ebbfe 100644
---
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
+++
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
@@ -19,17 +19,18 @@
package org.apache.bookkeeper.client;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.security.GeneralSecurityException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Enumeration;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
import lombok.Getter;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
@@ -53,7 +54,7 @@ import org.slf4j.LoggerFactory;
*/
public class PulsarMockLedgerHandle extends LedgerHandle {
- final ArrayList<LedgerEntryImpl> entries = Lists.newArrayList();
+ final List<LedgerEntryImpl> entries = Collections.synchronizedList(new
ArrayList<>());
final PulsarMockBookKeeper bk;
final long id;
final DigestType digest;
@@ -63,6 +64,8 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
@VisibleForTesting
@Getter
boolean fenced = false;
+ // Count for total length of the entries
+ final AtomicLong totalLengthCounter = new AtomicLong(0);
public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,
DigestType digest, byte[] passwd) throws
GeneralSecurityException {
@@ -73,7 +76,8 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
this.digest = digest;
this.passwd = Arrays.copyOf(passwd, passwd.length);
- readHandle = new PulsarMockReadHandle(bk, id, getLedgerMetadata(),
entries, bk::getReadHandleInterceptor);
+ readHandle = new PulsarMockReadHandle(bk, id, getLedgerMetadata(),
entries,
+ bk::getReadHandleInterceptor, totalLengthCounter);
}
@Override
@@ -159,6 +163,7 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
}
lastEntry = entries.size();
+ totalLengthCounter.addAndGet(data.length);
entries.add(LedgerEntryImpl.create(ledgerId, lastEntry, data.length,
Unpooled.wrappedBuffer(data)));
return lastEntry;
}
@@ -191,6 +196,7 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
lastEntry = entries.size();
byte[] storedData = new byte[data.readableBytes()];
data.readBytes(storedData);
+ totalLengthCounter.addAndGet(storedData.length);
entries.add(LedgerEntryImpl.create(ledgerId, lastEntry,
storedData.length,
Unpooled.wrappedBuffer(storedData)));
return FutureUtils.value(lastEntry);
@@ -231,12 +237,7 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
@Override
public long getLength() {
- long length = 0;
- for (LedgerEntryImpl entry : entries) {
- length += entry.getLength();
- }
-
- return length;
+ return totalLengthCounter.get();
}
diff --git
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java
index 9f3f4969199..62d84dbfc40 100644
---
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java
+++
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.client;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
@@ -42,15 +43,18 @@ class PulsarMockReadHandle implements ReadHandle {
private final LedgerMetadata metadata;
private final List<LedgerEntryImpl> entries;
private final Supplier<PulsarMockReadHandleInterceptor>
readHandleInterceptorSupplier;
+ private final AtomicLong totalLengthCounter;
PulsarMockReadHandle(PulsarMockBookKeeper bk, long ledgerId,
LedgerMetadata metadata,
List<LedgerEntryImpl> entries,
- Supplier<PulsarMockReadHandleInterceptor>
readHandleInterceptorSupplier) {
+ Supplier<PulsarMockReadHandleInterceptor>
readHandleInterceptorSupplier,
+ AtomicLong totalLengthCounter) {
this.bk = bk;
this.ledgerId = ledgerId;
this.metadata = metadata;
this.entries = entries;
this.readHandleInterceptorSupplier = readHandleInterceptorSupplier;
+ this.totalLengthCounter = totalLengthCounter;
}
@Override
@@ -99,12 +103,7 @@ class PulsarMockReadHandle implements ReadHandle {
@Override
public long getLength() {
- long length = 0;
- for (LedgerEntryImpl entry : entries) {
- length += entry.getLength();
- }
-
- return length;
+ return totalLengthCounter.get();
}
@Override