This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 65ddd44f2d6 [fix][broker] Prevent unexpected recycle failure in 
dispatcher's read callback (#24741)
65ddd44f2d6 is described below

commit 65ddd44f2d6a04bd8adec2d82a44380a5a6c156b
Author: Yunze Xu <xyzinfern...@163.com>
AuthorDate: Thu Sep 18 11:13:21 2025 +0800

    [fix][broker] Prevent unexpected recycle failure in dispatcher's read 
callback (#24741)
    
    (cherry picked from commit ccbd245aafb2b09d8f2e306de44b11134120c5c7)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |   3 +-
 .../mledger/util/ManagedLedgerUtils.java           |  29 +++++-
 .../mledger/ManagedLedgerReplayTaskTest.java       |   5 +-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java |  23 +++++
 .../PersistentDispatcherSingleActiveConsumer.java  | 102 ++++++---------------
 .../apache/pulsar/compaction/CompactedTopic.java   |   9 +-
 .../pulsar/compaction/CompactedTopicImpl.java      |  83 -----------------
 .../pulsar/compaction/CompactedTopicUtils.java     |  42 +++------
 .../broker/BrokerMessageDeduplicationTest.java     |   3 +-
 ...rsistentDispatcherSingleActiveConsumerTest.java |  13 ++-
 .../pulsar/compaction/CompactedTopicUtilsTest.java |   6 +-
 11 files changed, 112 insertions(+), 206 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 74b81153621..05e5e33df66 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -101,6 +101,7 @@ import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo.Builder;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.StringProperty;
+import org.apache.bookkeeper.mledger.util.ManagedLedgerUtils;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -219,7 +220,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
     private boolean alwaysInactive = false;
 
-    private static final long NO_MAX_SIZE_LIMIT = -1L;
+    private static final long NO_MAX_SIZE_LIMIT = 
ManagedLedgerUtils.NO_MAX_SIZE_LIMIT;
 
     private long entriesReadCount;
     private long entriesReadSize;
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerUtils.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerUtils.java
index e89e3ec349d..cbd43747cbb 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerUtils.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerUtils.java
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.mledger.util;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -29,6 +30,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.common.classification.InterfaceStability;
+import org.jspecify.annotations.Nullable;
 
 /**
  * This util class contains some future-based methods to replace 
callback-based APIs. With a callback-based API, if any
@@ -40,6 +42,8 @@ import 
org.apache.pulsar.common.classification.InterfaceStability;
 @InterfaceStability.Evolving
 public class ManagedLedgerUtils {
 
+    public static final long NO_MAX_SIZE_LIMIT = -1L;
+
     public static CompletableFuture<ManagedCursor> openCursor(ManagedLedger 
ml, String cursorName) {
         final var future = new CompletableFuture<ManagedCursor>();
         ml.asyncOpenCursor(cursorName, new AsyncCallbacks.OpenCursorCallback() 
{
@@ -58,8 +62,13 @@ public class ManagedLedgerUtils {
 
     public static CompletableFuture<List<Entry>> readEntries(ManagedCursor 
cursor, int numberOfEntriesToRead,
                                                              Position 
maxPosition) {
+        return readEntries(cursor, numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, 
maxPosition);
+    }
+
+    public static CompletableFuture<List<Entry>> readEntries(ManagedCursor 
cursor, int numberOfEntriesToRead,
+                                                             long maxBytes, 
Position maxPosition) {
         final var future = new CompletableFuture<List<Entry>>();
-        cursor.asyncReadEntries(numberOfEntriesToRead, new 
AsyncCallbacks.ReadEntriesCallback() {
+        cursor.asyncReadEntries(numberOfEntriesToRead, maxBytes, new 
AsyncCallbacks.ReadEntriesCallback() {
             @Override
             public void readEntriesComplete(List<Entry> entries, Object ctx) {
                 future.complete(entries);
@@ -73,6 +82,24 @@ public class ManagedLedgerUtils {
         return future;
     }
 
+    public static CompletableFuture<List<Entry>> readEntriesWithSkipOrWait(
+            ManagedCursor cursor, int maxEntries, long maxSizeBytes, 
PositionImpl maxPosition,
+            @Nullable Predicate<PositionImpl> skipCondition) {
+        final var future = new CompletableFuture<List<Entry>>();
+        cursor.asyncReadEntriesWithSkipOrWait(maxEntries, maxSizeBytes, new 
AsyncCallbacks.ReadEntriesCallback() {
+            @Override
+            public void readEntriesComplete(List<Entry> entries, Object ctx) {
+                future.complete(entries);
+            }
+
+            @Override
+            public void readEntriesFailed(ManagedLedgerException exception, 
Object ctx) {
+                future.completeExceptionally(exception);
+            }
+        }, null, maxPosition, skipCondition);
+        return future;
+    }
+
     public static CompletableFuture<Void> markDelete(ManagedCursor cursor, 
Position position,
                                                      Map<String, Long> 
properties) {
         final var future = new CompletableFuture<Void>();
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/ManagedLedgerReplayTaskTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/ManagedLedgerReplayTaskTest.java
index 56c827b2166..177b6773058 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/ManagedLedgerReplayTaskTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/ManagedLedgerReplayTaskTest.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertEquals;
@@ -128,11 +129,11 @@ public class ManagedLedgerReplayTaskTest extends 
MockedBookKeeperTestCase {
             }
         }).when(cursor).hasMoreEntries();
         doAnswer(invocation -> {
-            final var callback = (AsyncCallbacks.ReadEntriesCallback) 
invocation.getArgument(1);
+            final var callback = (AsyncCallbacks.ReadEntriesCallback) 
invocation.getArgument(2);
             final var entries = List.<Entry>of(EntryImpl.create(1, 1, 
"msg".getBytes()));
             callback.readEntriesComplete(entries, null);
             return null;
-        }).when(cursor).asyncReadEntries(anyInt(), any(), any(), any());
+        }).when(cursor).asyncReadEntries(anyInt(), anyLong(), any(), any(), 
any());
 
         try {
             replayTask.replay(cursor, (__, ___) -> {
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 3d258c9ec7f..4683b5f6759 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -102,6 +102,7 @@ import 
org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
+import org.apache.bookkeeper.mledger.util.ManagedLedgerUtils;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.commons.collections.iterators.EmptyIterator;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
@@ -5317,5 +5318,27 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         }).toList(), IntStream.range(0, 10).mapToObj(i -> "msg-" + 
i).toList());
     }
 
+    @Test
+    public void testConcurrentRead() throws Exception {
+        final var config = new ManagedLedgerConfig();
+        config.setReadEntryTimeoutSeconds(0);
+        config.setNewEntriesCheckDelayInMillis(1000);
+        @Cleanup final var ledger = factory.open("testConcurrentRead", config);
+        final var cursor = ledger.openCursor("cursor");
+        final var future1 = 
ManagedLedgerUtils.readEntriesWithSkipOrWait(cursor, 10, Integer.MAX_VALUE,
+                PositionImpl.LATEST, null);
+        final var future2 = 
ManagedLedgerUtils.readEntriesWithSkipOrWait(cursor, 10, Integer.MAX_VALUE,
+                PositionImpl.LATEST, null);
+        assertTrue(future2.isCompletedExceptionally());
+        try {
+            future2.get();
+            fail();
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof 
ManagedLedgerException.ConcurrentWaitCallbackException);
+        }
+        ledger.addEntry("msg".getBytes());
+        assertEquals(future1.get(2, TimeUnit.SECONDS).get(0).getData(), 
"msg".getBytes());
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ManagedCursorTest.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index bb8843b9397..1b61ac3923e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -18,20 +18,19 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
+import static 
org.apache.bookkeeper.mledger.util.ManagedLedgerUtils.readEntriesWithSkipOrWait;
 import static 
org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
 import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import com.google.common.annotations.VisibleForTesting;
-import io.netty.util.Recycler;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Objects;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import lombok.Getter;
-import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -54,14 +53,14 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.util.Backoff;
 import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.compaction.CompactedTopicUtils;
 import org.apache.pulsar.compaction.Compactor;
-import org.apache.pulsar.compaction.TopicCompactionService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class PersistentDispatcherSingleActiveConsumer extends 
AbstractDispatcherSingleActiveConsumer
-        implements Dispatcher, ReadEntriesCallback {
+        implements Dispatcher {
 
     private final AtomicBoolean isRescheduleReadInProgress = new 
AtomicBoolean(false);
     protected final PersistentTopic topic;
@@ -154,16 +153,7 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
         }
     }
 
-    @Override
-    public void readEntriesComplete(final List<Entry> entries, Object obj) {
-        executor.execute(() -> internalReadEntriesComplete(entries, obj));
-    }
-
-    private synchronized void internalReadEntriesComplete(final List<Entry> 
entries, Object obj) {
-        ReadEntriesCtx readEntriesCtx = (ReadEntriesCtx) obj;
-        Consumer readConsumer = readEntriesCtx.getConsumer();
-        long epoch = readEntriesCtx.getEpoch();
-        readEntriesCtx.recycle();
+    private synchronized void readEntriesComplete(List<Entry> entries, 
Consumer readConsumer, long epoch) {
         if (log.isDebugEnabled()) {
             log.debug("[{}-{}] Got messages: {}", name, readConsumer, 
entries.size());
         }
@@ -376,19 +366,27 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
                     log.debug("[{}-{}] Schedule read of {} messages", name, 
consumer, messagesToRead);
                 }
                 havePendingRead = true;
+                // TODO: should we pass the consumer epoch for compacted read 
path? See
+                //   https://github.com/apache/pulsar/issues/13690
+                final var epoch = consumer.readCompacted() ? 
DEFAULT_CONSUMER_EPOCH : consumer.getConsumerEpoch();
+                final CompletableFuture<List<Entry>> entriesFuture;
                 if (consumer.readCompacted()) {
                     boolean readFromEarliest = isFirstRead && 
MessageId.earliest.equals(consumer.getStartMessageId())
                             && (!cursor.isDurable() || 
cursor.getName().equals(Compactor.COMPACTION_SUBSCRIPTION)
                             || hasValidMarkDeletePosition(cursor));
-                    TopicCompactionService topicCompactionService = 
topic.getTopicCompactionService();
-                    
CompactedTopicUtils.asyncReadCompactedEntries(topicCompactionService, cursor, 
messagesToRead,
-                            bytesToRead, topic.getMaxReadPosition(), 
readFromEarliest, this, true, consumer);
+                    entriesFuture = 
CompactedTopicUtils.asyncReadCompactedEntries(topic.getTopicCompactionService(),
+                            cursor, messagesToRead, bytesToRead, 
topic.getMaxReadPosition(), readFromEarliest, true);
                 } else {
-                    ReadEntriesCtx readEntriesCtx =
-                            ReadEntriesCtx.create(consumer, 
consumer.getConsumerEpoch());
-                    cursor.asyncReadEntriesOrWait(messagesToRead,
-                            bytesToRead, this, readEntriesCtx, 
topic.getMaxReadPosition());
+                    entriesFuture = readEntriesWithSkipOrWait(cursor, 
messagesToRead, bytesToRead,
+                            topic.getMaxReadPosition(), null);
                 }
+                entriesFuture.whenCompleteAsync((entries, e) -> {
+                    if (e == null) {
+                        readEntriesComplete(entries, consumer, epoch);
+                    } else {
+                        readEntriesFailed(e, consumer);
+                    }
+                }, executor);
             }
         } else {
             if (log.isDebugEnabled()) {
@@ -458,16 +456,10 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
         return Pair.of(messagesToRead, bytesToRead);
     }
 
-    @Override
-    public void readEntriesFailed(ManagedLedgerException exception, Object 
ctx) {
-        executor.execute(() -> internalReadEntriesFailed(exception, ctx));
-    }
-
-    private synchronized void internalReadEntriesFailed(ManagedLedgerException 
exception, Object ctx) {
+    @VisibleForTesting
+    public synchronized void readEntriesFailed(Throwable throwable, Consumer 
consumer) {
         havePendingRead = false;
-        ReadEntriesCtx readEntriesCtx = (ReadEntriesCtx) ctx;
-        Consumer c = readEntriesCtx.getConsumer();
-        readEntriesCtx.recycle();
+        final var exception = FutureUtil.unwrapCompletionException(throwable);
 
         // Do not keep reading messages from a closed cursor.
         if (exception instanceof 
ManagedLedgerException.CursorAlreadyClosedException) {
@@ -499,21 +491,19 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
                         exception.getMessage(), waitTimeMillis / 1000.0);
             }
         } else if (!(exception instanceof TooManyRequestsException)) {
-            log.error("[{}-{}] Error reading entries at {} : {} - Retrying to 
read in {} seconds", name, c,
+            log.error("[{}-{}] Error reading entries at {} : {} - Retrying to 
read in {} seconds", name, consumer,
                     cursor.getReadPosition(), exception.getMessage(), 
waitTimeMillis / 1000.0);
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("[{}-{}] Got throttled by bookies while reading at 
{} : {} - Retrying to read in {} seconds",
-                        name, c, cursor.getReadPosition(), 
exception.getMessage(), waitTimeMillis / 1000.0);
+                        name, consumer, cursor.getReadPosition(), 
exception.getMessage(), waitTimeMillis / 1000.0);
             }
         }
 
-        Objects.requireNonNull(c);
-
         // Reduce read batch size to avoid flooding bookies with retries
         readBatchSize = serviceConfig.getDispatcherMinReadBatchSize();
 
-        scheduleReadEntriesWithDelay(c, waitTimeMillis);
+        scheduleReadEntriesWithDelay(consumer, waitTimeMillis);
     }
 
     @VisibleForTesting
@@ -586,44 +576,4 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);
-
-    public static class ReadEntriesCtx {
-
-        private Consumer consumer;
-        private long epoch;
-
-        private final Recycler.Handle<ReadEntriesCtx> recyclerHandle;
-
-        private ReadEntriesCtx(Recycler.Handle<ReadEntriesCtx> recyclerHandle) 
{
-            this.recyclerHandle = recyclerHandle;
-        }
-        private static final Recycler<ReadEntriesCtx> RECYCLER =
-                new Recycler<ReadEntriesCtx>() {
-            @Override
-            protected ReadEntriesCtx newObject(Recycler.Handle<ReadEntriesCtx> 
recyclerHandle) {
-                return new ReadEntriesCtx(recyclerHandle);
-            }
-        };
-
-        public static ReadEntriesCtx create(Consumer consumer, long epoch) {
-            ReadEntriesCtx readEntriesCtx = RECYCLER.get();
-            readEntriesCtx.consumer = consumer;
-            readEntriesCtx.epoch = epoch;
-            return readEntriesCtx;
-        }
-
-        Consumer getConsumer() {
-            return consumer;
-        }
-
-        long getEpoch() {
-            return epoch;
-        }
-
-        public void recycle() {
-            consumer = null;
-            epoch = 0;
-            recyclerHandle.recycle(this);
-        }
-    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
index 146ba4327d2..9bf3fc38088 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
@@ -23,6 +23,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.Consumer;
@@ -35,17 +36,19 @@ public interface CompactedTopic {
      * Read entries from compacted topic.
      *
      * @deprecated Use {@link 
CompactedTopicUtils#asyncReadCompactedEntries(TopicCompactionService, 
ManagedCursor,
-     * int, long, org.apache.bookkeeper.mledger.impl.PositionImpl, boolean, 
ReadEntriesCallback, boolean, Consumer)}
+     * int, long, org.apache.bookkeeper.mledger.PositionImpl, boolean, 
boolean)}
      * instead.
      */
     @Deprecated
-    void asyncReadEntriesOrWait(ManagedCursor cursor,
+    default void asyncReadEntriesOrWait(ManagedCursor cursor,
                                 int maxEntries,
                                 long bytesToRead,
                                 PositionImpl maxReadPosition,
                                 boolean isFirstRead,
                                 ReadEntriesCallback callback,
-                                Consumer consumer);
+                                Consumer consumer) {
+        callback.readEntriesFailed(new ManagedLedgerException("deprecated"), 
null);
+    }
     CompletableFuture<Entry> readLastEntryOfCompactedLedger();
     Optional<Position> getCompactionHorizon();
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index ddc4c08bfdd..33133f3df2a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -18,14 +18,12 @@
  */
 package org.apache.pulsar.compaction;
 
-import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ComparisonChain;
 import io.netty.buffer.ByteBuf;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Enumeration;
 import java.util.List;
 import java.util.NoSuchElementException;
@@ -39,17 +37,10 @@ import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
-import org.apache.bookkeeper.mledger.ManagedCursor;
-import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.EntryImpl;
-import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.pulsar.broker.service.Consumer;
-import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.ReadEntriesCtx;
-import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.impl.RawMessageImpl;
 import org.apache.pulsar.common.api.proto.MessageIdData;
@@ -108,80 +99,6 @@ public class CompactedTopicImpl implements CompactedTopic {
         return tryDeleteCompactedLedger(bk, compactedLedgerId);
     }
 
-    @Override
-    @Deprecated
-    public void asyncReadEntriesOrWait(ManagedCursor cursor,
-                                       int maxEntries,
-                                       long bytesToRead,
-                                       PositionImpl maxReadPosition,
-                                       boolean isFirstRead,
-                                       ReadEntriesCallback callback, Consumer 
consumer) {
-            PositionImpl cursorPosition;
-            boolean readFromEarliest = isFirstRead && 
MessageId.earliest.equals(consumer.getStartMessageId())
-                && (!cursor.isDurable() || 
cursor.getName().equals(Compactor.COMPACTION_SUBSCRIPTION)
-                || cursor.getMarkDeletedPosition() == null
-                || cursor.getMarkDeletedPosition().getEntryId() == -1L);
-            if (readFromEarliest){
-                cursorPosition = PositionImpl.EARLIEST;
-            } else {
-                cursorPosition = (PositionImpl) cursor.getReadPosition();
-            }
-
-            // TODO: redeliver epoch link 
https://github.com/apache/pulsar/issues/13690
-            ReadEntriesCtx readEntriesCtx = ReadEntriesCtx.create(consumer, 
DEFAULT_CONSUMER_EPOCH);
-
-            final PositionImpl currentCompactionHorizon = compactionHorizon;
-
-            if (currentCompactionHorizon == null
-                || currentCompactionHorizon.compareTo(cursorPosition) < 0) {
-                cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, 
callback, readEntriesCtx, maxReadPosition);
-            } else {
-                ManagedCursorImpl managedCursor = (ManagedCursorImpl) cursor;
-                int numberOfEntriesToRead = 
managedCursor.applyMaxSizeCap(maxEntries, bytesToRead);
-
-                compactedTopicContext.thenCompose(
-                    (context) -> findStartPoint(cursorPosition, 
context.ledger.getLastAddConfirmed(), context.cache)
-                        .thenCompose((startPoint) -> {
-                            // do not need to read the compaction ledger if it 
is empty.
-                            // the cursor just needs to be set to the 
compaction horizon
-                            if (startPoint == COMPACT_LEDGER_EMPTY || 
startPoint == NEWER_THAN_COMPACTED) {
-                                
cursor.seek(currentCompactionHorizon.getNext());
-                                
callback.readEntriesComplete(Collections.emptyList(), readEntriesCtx);
-                                return CompletableFuture.completedFuture(null);
-                            } else {
-                                long endPoint = 
Math.min(context.ledger.getLastAddConfirmed(),
-                                                         startPoint + 
(numberOfEntriesToRead - 1));
-                                return readEntries(context.ledger, startPoint, 
endPoint)
-                                    .thenAccept((entries) -> {
-                                        long entriesSize = 0;
-                                        for (Entry entry : entries) {
-                                            entriesSize += entry.getLength();
-                                        }
-                                        
managedCursor.updateReadStats(entries.size(), entriesSize);
-
-                                        Entry lastEntry = 
entries.get(entries.size() - 1);
-                                        // The compaction task depends on the 
last snapshot and the incremental
-                                        // entries to build the new snapshot. 
So for the compaction cursor, we
-                                        // need to force seek the read 
position to ensure the compactor can read
-                                        // the complete last snapshot because 
of the compactor will read the data
-                                        // before the compaction cursor mark 
delete position
-                                        
cursor.seek(lastEntry.getPosition().getNext(), true);
-                                        callback.readEntriesComplete(entries, 
readEntriesCtx);
-                                    });
-                            }
-                        }))
-                    .exceptionally((exception) -> {
-                        if (exception.getCause() instanceof 
NoSuchElementException) {
-                            cursor.seek(currentCompactionHorizon.getNext());
-                            
callback.readEntriesComplete(Collections.emptyList(), readEntriesCtx);
-                        } else {
-                            callback.readEntriesFailed(new 
ManagedLedgerException(exception), readEntriesCtx);
-                        }
-                        return null;
-                    });
-            }
-    }
-
     static CompletableFuture<Long> findStartPoint(PositionImpl p,
                                                   long lastEntryId,
                                                   AsyncLoadingCache<Long, 
MessageIdData> cache) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
index 098bb248d04..c797593a390 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
@@ -19,36 +19,28 @@
 package org.apache.pulsar.compaction;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
+import static 
org.apache.bookkeeper.mledger.util.ManagedLedgerUtils.readEntries;
+import static 
org.apache.bookkeeper.mledger.util.ManagedLedgerUtils.readEntriesWithSkipOrWait;
 import com.google.common.annotations.Beta;
-import java.util.Collections;
+import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
-import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
-import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.collections4.CollectionUtils;
-import org.apache.pulsar.broker.service.Consumer;
-import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
-import org.apache.pulsar.common.util.FutureUtil;
-import org.jspecify.annotations.Nullable;
 
 public class CompactedTopicUtils {
 
     @Beta
-    public static void asyncReadCompactedEntries(TopicCompactionService 
topicCompactionService,
-                                                 ManagedCursor cursor, int 
maxEntries,
-                                                 long bytesToRead, 
PositionImpl maxReadPosition,
-                                                 boolean readFromEarliest, 
AsyncCallbacks.ReadEntriesCallback callback,
-                                                 boolean wait, @Nullable 
Consumer consumer) {
+    public static CompletableFuture<List<Entry>> asyncReadCompactedEntries(
+            TopicCompactionService topicCompactionService, ManagedCursor 
cursor, int maxEntries,
+            long bytesToRead, PositionImpl maxReadPosition, boolean 
readFromEarliest, boolean wait) {
         Objects.requireNonNull(topicCompactionService);
         Objects.requireNonNull(cursor);
         checkArgument(maxEntries > 0);
-        Objects.requireNonNull(callback);
 
         final PositionImpl readPosition;
         if (readFromEarliest) {
@@ -57,37 +49,31 @@ public class CompactedTopicUtils {
             readPosition = (PositionImpl) cursor.getReadPosition();
         }
 
-        // TODO: redeliver epoch link 
https://github.com/apache/pulsar/issues/13690
-        PersistentDispatcherSingleActiveConsumer.ReadEntriesCtx readEntriesCtx 
=
-                
PersistentDispatcherSingleActiveConsumer.ReadEntriesCtx.create(consumer, 
DEFAULT_CONSUMER_EPOCH);
-
         CompletableFuture<Position> lastCompactedPositionFuture = 
topicCompactionService.getLastCompactedPosition();
 
-        lastCompactedPositionFuture.thenCompose(lastCompactedPosition -> {
+        return lastCompactedPositionFuture.thenCompose(lastCompactedPosition 
-> {
             if (lastCompactedPosition == null
                     || readPosition.compareTo(
                     lastCompactedPosition.getLedgerId(), 
lastCompactedPosition.getEntryId()) > 0) {
                 if (wait) {
-                    cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, 
callback, readEntriesCtx, maxReadPosition);
+                    return readEntriesWithSkipOrWait(cursor, maxEntries, 
bytesToRead, maxReadPosition, null);
                 } else {
-                    cursor.asyncReadEntries(maxEntries, bytesToRead, callback, 
readEntriesCtx, maxReadPosition);
+                    return readEntries(cursor, maxEntries, bytesToRead, 
maxReadPosition);
                 }
-                return CompletableFuture.completedFuture(null);
             }
 
             ManagedCursorImpl managedCursor = (ManagedCursorImpl) cursor;
             int numberOfEntriesToRead = 
managedCursor.applyMaxSizeCap(maxEntries, bytesToRead);
 
             return topicCompactionService.readCompactedEntries(readPosition, 
numberOfEntriesToRead)
-                    .thenAccept(entries -> {
+                    .thenApply(entries -> {
                         if (CollectionUtils.isEmpty(entries)) {
                             Position seekToPosition = 
lastCompactedPosition.getNext();
                             if 
(readPosition.compareTo(seekToPosition.getLedgerId(), 
seekToPosition.getEntryId()) > 0) {
                                 seekToPosition = readPosition;
                             }
                             cursor.seek(seekToPosition);
-                            
callback.readEntriesComplete(Collections.emptyList(), readEntriesCtx);
-                            return;
+                            return entries;
                         }
 
                         long entriesSize = 0;
@@ -98,12 +84,8 @@ public class CompactedTopicUtils {
 
                         Entry lastEntry = entries.get(entries.size() - 1);
                         cursor.seek(lastEntry.getPosition().getNext(), true);
-                        callback.readEntriesComplete(entries, readEntriesCtx);
+                        return entries;
                     });
-        }).exceptionally((exception) -> {
-            exception = FutureUtil.unwrapCompletionException(exception);
-            
callback.readEntriesFailed(ManagedLedgerException.getManagedLedgerException(exception),
 readEntriesCtx);
-            return null;
         });
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java
index 88fe345334f..c83803d9b9f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -121,7 +122,7 @@ public class BrokerMessageDeduplicationTest {
         try {
             doAnswer(invocation -> {
                 throw new RuntimeException("asyncReadEntries failed");
-            }).when(cursor).asyncReadEntries(anyInt(), any(), any(), any());
+            }).when(cursor).asyncReadEntries(anyInt(), anyLong(), any(), 
any(), any());
             deduplication.checkStatus().get(3, TimeUnit.SECONDS);
             fail();
         } catch (ExecutionException e) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java
index a4c9e26ffb8..05b7242723e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.persistent;
 import java.util.concurrent.atomic.AtomicInteger;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
@@ -31,6 +32,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
+import org.awaitility.Awaitility;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -109,17 +111,18 @@ public class PersistentDispatcherSingleActiveConsumerTest 
extends ProducerConsum
 
         // Mock the readEntriesOrWait(...) to simulate the cursor is closed.
         Mockito.doAnswer(inv -> {
-            PersistentDispatcherSingleActiveConsumer dispatcher1 = 
inv.getArgument(2);
-            dispatcher1.readEntriesFailed(new 
ManagedLedgerException.CursorAlreadyClosedException("cursor closed"),
+            final var callback = (AsyncCallbacks.ReadEntriesCallback) 
inv.getArgument(2);
+            callback.readEntriesFailed(new 
ManagedLedgerException.CursorAlreadyClosedException("cursor closed"),
                     null);
             return null;
-        }).when(cursor).asyncReadEntriesOrWait(Mockito.anyInt(), 
Mockito.anyLong(), Mockito.eq(dispatcher),
-                Mockito.any(), Mockito.any());
+        }).when(cursor).asyncReadEntriesWithSkipOrWait(Mockito.anyInt(), 
Mockito.anyLong(), Mockito.any(),
+                Mockito.any(), Mockito.any(), Mockito.any());
 
         dispatcher.readMoreEntries(consumer);
 
         // Verify: the readEntriesFailed should be called once and the 
scheduleReadEntriesWithDelay should not be called.
-        Assert.assertTrue(callReadEntriesFailed.get() == 1 && 
callScheduleReadEntriesWithDelayCnt.get() == 0);
+        Awaitility.await().untilAsserted(() -> 
Assert.assertTrue(callReadEntriesFailed.get() == 1
+                && callScheduleReadEntriesWithDelayCnt.get() == 0));
 
         // Verify: the topic can be deleted successfully.
         admin.topics().delete(topicName, false);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java
index 2545c0362e8..a31ab01d549 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java
@@ -69,10 +69,8 @@ public class CompactedTopicUtilsTest {
             }
         };
 
-        CompactedTopicUtils.asyncReadCompactedEntries(service, cursor, 1, 100,
-                PositionImpl.LATEST, false, readEntriesCallback, false, null);
-
-        List<Entry> entries = completableFuture.get();
+        final var entries = 
CompactedTopicUtils.asyncReadCompactedEntries(service, cursor, 1, 100,
+                PositionImpl.LATEST, false, false).get();
         Assert.assertTrue(entries.isEmpty());
         Assert.assertNull(throwableRef.get());
         Assert.assertEquals(readPositionRef.get(), 
lastCompactedPosition.getNext());


Reply via email to