This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a303ac9d561ccd068d1efa3a06ceafd2e80f1ee8
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Jun 12 18:07:39 2026 +0300

    [fix][test][branch-4.0] Backport configurable read/add delays in 
PulsarMockBookKeeper
    
    Partial cherry-pick of the testmocks changes from
    490ba0cca18 ([improve][broker] Implement PIP-430 Pulsar Broker cache
    improvements (#24623)), without the JFR read event interceptor parts
    which aren't needed on branch-4.0.
    
    Required so that CompactionTest compiles after cherry-picking
    ded1e42d352 (#25998), which uses
    PulsarMockBookKeeper.setDefaultReadEntriesDelayMillis.
---
 .../bookkeeper/client/PulsarMockBookKeeper.java    | 30 ++++++++++++++++++
 .../bookkeeper/client/PulsarMockLedgerHandle.java  | 36 +++++++++++++---------
 2 files changed, 51 insertions(+), 15 deletions(-)

diff --git 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
index 7104ded7460..7bebc0e558f 100644
--- 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
+++ 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
@@ -75,6 +75,8 @@ public class PulsarMockBookKeeper extends BookKeeper {
     final OrderedExecutor orderedExecutor;
     final ExecutorService executor;
     final ScheduledExecutorService scheduler;
+    private volatile long defaultAddEntryDelayMillis = 1L;
+    private volatile long defaultReadEntriesDelayMillis = 1L;
 
     @Override
     public ClientConfiguration getConf() {
@@ -492,5 +494,33 @@ public class PulsarMockBookKeeper extends BookKeeper {
         return metadataClientDriver;
     }
 
+    public long getReadEntriesDelayMillis() {
+        return defaultReadEntriesDelayMillis;
+    }
+
+    public long getNextAddEntryDelayMillis() {
+        Long delay = addEntryDelaysMillis.poll();
+        if (delay != null) {
+            return delay;
+        }
+        return defaultAddEntryDelayMillis;
+    }
+
+    public long getNextAddEntryResponseDelayMillis() {
+        Long delay = addEntryResponseDelaysMillis.poll();
+        if (delay != null) {
+            return delay;
+        }
+        return 0;
+    }
+
+    public void setDefaultAddEntryDelayMillis(long defaultAddEntryDelayMillis) 
{
+        this.defaultAddEntryDelayMillis = defaultAddEntryDelayMillis;
+    }
+
+    public void setDefaultReadEntriesDelayMillis(long 
defaultReadEntriesDelayMillis) {
+        this.defaultReadEntriesDelayMillis = defaultReadEntriesDelayMillis;
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(PulsarMockBookKeeper.class);
 }
diff --git 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
index 4d1fd1380c8..a5403774133 100644
--- 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
+++ 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
@@ -113,18 +113,26 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
     @Override
     public void asyncReadEntries(final long firstEntry, final long lastEntry, 
final ReadCallback cb, final Object ctx) {
         bk.getProgrammedFailure().thenComposeAsync((res) -> {
-                log.debug("readEntries: first={} last={} total={}", 
firstEntry, lastEntry, entries.size());
+                if (log.isDebugEnabled()) {
+                    log.debug("readEntries: first={} last={} total={}", 
firstEntry, lastEntry, entries.size());
+                }
                 final Queue<LedgerEntry> seq = new ArrayDeque<LedgerEntry>();
                 long entryId = firstEntry;
                 while (entryId <= lastEntry && entryId < entries.size()) {
                     seq.add(new LedgerEntry(entries.get((int) 
entryId++).duplicate()));
                 }
 
-                log.debug("Entries read: {}", seq);
+                if (log.isDebugEnabled()) {
+                    log.debug("Entries read: {}", seq);
+                }
 
-                try {
-                    Thread.sleep(1);
-                } catch (InterruptedException e) {
+                long readEntriesDelay = bk.getReadEntriesDelayMillis();
+                if (readEntriesDelay > 0) {
+                    try {
+                        Thread.sleep(readEntriesDelay);
+                    } catch (InterruptedException e) {
+                        // ignore
+                    }
                 }
 
                 Enumeration<LedgerEntry> entries = new 
Enumeration<LedgerEntry>() {
@@ -182,14 +190,12 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
     @Override
     public void asyncAddEntry(final ByteBuf data, final AddCallback cb, final 
Object ctx) {
         bk.getAddEntryFailure().thenComposeAsync((res) -> {
-                Long delayMillis = bk.addEntryDelaysMillis.poll();
-                if (delayMillis == null) {
-                    delayMillis = 1L;
-                }
-
-                try {
-                    Thread.sleep(delayMillis);
-                } catch (InterruptedException e) {
+                long delayMillis = bk.getNextAddEntryDelayMillis();
+                if (delayMillis > 0) {
+                    try {
+                        Thread.sleep(delayMillis);
+                    } catch (InterruptedException e) {
+                    }
                 }
 
                 if (fenced) {
@@ -211,8 +217,8 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
                         
cb.addComplete(PulsarMockBookKeeper.getExceptionCode(exception),
                                        PulsarMockLedgerHandle.this, 
LedgerHandle.INVALID_ENTRY_ID, ctx);
                     } else {
-                        Long responseDelayMillis = 
bk.addEntryResponseDelaysMillis.poll();
-                        if (responseDelayMillis != null) {
+                        long responseDelayMillis = 
bk.getNextAddEntryResponseDelayMillis();
+                        if (responseDelayMillis > 0) {
                             try {
                                 Thread.sleep(responseDelayMillis);
                             } catch (InterruptedException e) {

Reply via email to