This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new aeb1bd1bf9a [fix][test] Fix flaky 
KeySharedSubscriptionBrokerCacheTest.testReplayQueueReadsGettingCached (#24955)
aeb1bd1bf9a is described below

commit aeb1bd1bf9a178d5506522d43c17d98b88f87796
Author: Ruimin MA <[email protected]>
AuthorDate: Tue Nov 11 14:28:58 2025 +0800

    [fix][test] Fix flaky 
KeySharedSubscriptionBrokerCacheTest.testReplayQueueReadsGettingCached (#24955)
---
 .../bookkeeper/client/PulsarMockBookKeeper.java       |  3 ++-
 .../bookkeeper/client/PulsarMockLedgerHandle.java     | 19 ++++++++++---------
 .../bookkeeper/client/PulsarMockReadHandle.java       | 13 ++++++-------
 3 files changed, 18 insertions(+), 17 deletions(-)

diff --git 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
index 59fb7ff5a62..9a195159377 100644
--- 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
+++ 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
@@ -260,7 +260,7 @@ public class PulsarMockBookKeeper extends BookKeeper {
                             } else {
                                 return FutureUtils.value(new 
PulsarMockReadHandle(PulsarMockBookKeeper.this, ledgerId,
                                         lh.getLedgerMetadata(), lh.entries,
-                                        
PulsarMockBookKeeper.this::getReadHandleInterceptor));
+                                        
PulsarMockBookKeeper.this::getReadHandleInterceptor, lh.totalLengthCounter));
                             }
                         });
             }
@@ -303,6 +303,7 @@ public class PulsarMockBookKeeper extends BookKeeper {
         }
         for (PulsarMockLedgerHandle ledger : ledgers.values()) {
             ledger.entries.clear();
+            ledger.totalLengthCounter.set(0);
         }
         scheduler.shutdown();
         ledgers.clear();
diff --git 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
index 3ed591ba135..74dab3ebbfe 100644
--- 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
+++ 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
@@ -19,17 +19,18 @@
 package org.apache.bookkeeper.client;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import java.security.GeneralSecurityException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Enumeration;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
 import lombok.Getter;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
@@ -53,7 +54,7 @@ import org.slf4j.LoggerFactory;
  */
 public class PulsarMockLedgerHandle extends LedgerHandle {
 
-    final ArrayList<LedgerEntryImpl> entries = Lists.newArrayList();
+    final List<LedgerEntryImpl> entries = Collections.synchronizedList(new 
ArrayList<>());
     final PulsarMockBookKeeper bk;
     final long id;
     final DigestType digest;
@@ -63,6 +64,8 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
     @VisibleForTesting
     @Getter
     boolean fenced = false;
+    // Count for total length of the entries
+    final AtomicLong totalLengthCounter = new AtomicLong(0);
 
     public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,
                            DigestType digest, byte[] passwd) throws 
GeneralSecurityException {
@@ -73,7 +76,8 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
         this.digest = digest;
         this.passwd = Arrays.copyOf(passwd, passwd.length);
 
-        readHandle = new PulsarMockReadHandle(bk, id, getLedgerMetadata(), 
entries, bk::getReadHandleInterceptor);
+        readHandle = new PulsarMockReadHandle(bk, id, getLedgerMetadata(), 
entries,
+                bk::getReadHandleInterceptor, totalLengthCounter);
     }
 
     @Override
@@ -159,6 +163,7 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
         }
 
         lastEntry = entries.size();
+        totalLengthCounter.addAndGet(data.length);
         entries.add(LedgerEntryImpl.create(ledgerId, lastEntry, data.length, 
Unpooled.wrappedBuffer(data)));
         return lastEntry;
     }
@@ -191,6 +196,7 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
                     lastEntry = entries.size();
                     byte[] storedData = new byte[data.readableBytes()];
                     data.readBytes(storedData);
+                    totalLengthCounter.addAndGet(storedData.length);
                     entries.add(LedgerEntryImpl.create(ledgerId, lastEntry,
                                                        storedData.length, 
Unpooled.wrappedBuffer(storedData)));
                     return FutureUtils.value(lastEntry);
@@ -231,12 +237,7 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
 
     @Override
     public long getLength() {
-        long length = 0;
-        for (LedgerEntryImpl entry : entries) {
-            length += entry.getLength();
-        }
-
-        return length;
+        return totalLengthCounter.get();
     }
 
 
diff --git 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java
 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java
index 9f3f4969199..62d84dbfc40 100644
--- 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java
+++ 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.client;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
@@ -42,15 +43,18 @@ class PulsarMockReadHandle implements ReadHandle {
     private final LedgerMetadata metadata;
     private final List<LedgerEntryImpl> entries;
     private final Supplier<PulsarMockReadHandleInterceptor> 
readHandleInterceptorSupplier;
+    private final AtomicLong totalLengthCounter;
 
     PulsarMockReadHandle(PulsarMockBookKeeper bk, long ledgerId, 
LedgerMetadata metadata,
                          List<LedgerEntryImpl> entries,
-                         Supplier<PulsarMockReadHandleInterceptor> 
readHandleInterceptorSupplier) {
+                         Supplier<PulsarMockReadHandleInterceptor> 
readHandleInterceptorSupplier,
+                         AtomicLong totalLengthCounter) {
         this.bk = bk;
         this.ledgerId = ledgerId;
         this.metadata = metadata;
         this.entries = entries;
         this.readHandleInterceptorSupplier = readHandleInterceptorSupplier;
+        this.totalLengthCounter = totalLengthCounter;
     }
 
     @Override
@@ -99,12 +103,7 @@ class PulsarMockReadHandle implements ReadHandle {
 
     @Override
     public long getLength() {
-        long length = 0;
-        for (LedgerEntryImpl entry : entries) {
-            length += entry.getLength();
-        }
-
-        return length;
+        return totalLengthCounter.get();
     }
 
     @Override

Reply via email to