This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 65ddd44f2d6 [fix][broker] Prevent unexpected recycle failure in dispatcher's read callback (#24741) 65ddd44f2d6 is described below commit 65ddd44f2d6a04bd8adec2d82a44380a5a6c156b Author: Yunze Xu <xyzinfern...@163.com> AuthorDate: Thu Sep 18 11:13:21 2025 +0800 [fix][broker] Prevent unexpected recycle failure in dispatcher's read callback (#24741) (cherry picked from commit ccbd245aafb2b09d8f2e306de44b11134120c5c7) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 3 +- .../mledger/util/ManagedLedgerUtils.java | 29 +++++- .../mledger/ManagedLedgerReplayTaskTest.java | 5 +- .../bookkeeper/mledger/impl/ManagedCursorTest.java | 23 +++++ .../PersistentDispatcherSingleActiveConsumer.java | 102 ++++++--------------- .../apache/pulsar/compaction/CompactedTopic.java | 9 +- .../pulsar/compaction/CompactedTopicImpl.java | 83 ----------------- .../pulsar/compaction/CompactedTopicUtils.java | 42 +++------ .../broker/BrokerMessageDeduplicationTest.java | 3 +- ...rsistentDispatcherSingleActiveConsumerTest.java | 13 ++- .../pulsar/compaction/CompactedTopicUtilsTest.java | 6 +- 11 files changed, 112 insertions(+), 206 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 74b81153621..05e5e33df66 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -101,6 +101,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange; import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo.Builder; import org.apache.bookkeeper.mledger.proto.MLDataFormats.StringProperty; +import org.apache.bookkeeper.mledger.util.ManagedLedgerUtils; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.util.FutureUtil; @@ -219,7 +220,7 @@ public class ManagedCursorImpl implements ManagedCursor { private boolean alwaysInactive = false; - private static final long NO_MAX_SIZE_LIMIT = -1L; + private static final long NO_MAX_SIZE_LIMIT = ManagedLedgerUtils.NO_MAX_SIZE_LIMIT; private long entriesReadCount; private long entriesReadSize; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerUtils.java index e89e3ec349d..cbd43747cbb 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerUtils.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerUtils.java @@ -21,6 +21,7 @@ package org.apache.bookkeeper.mledger.util; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.function.Predicate; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -29,6 +30,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.common.classification.InterfaceStability; +import org.jspecify.annotations.Nullable; /** * This util class contains some future-based methods to replace callback-based APIs. With a callback-based API, if any @@ -40,6 +42,8 @@ import org.apache.pulsar.common.classification.InterfaceStability; @InterfaceStability.Evolving public class ManagedLedgerUtils { + public static final long NO_MAX_SIZE_LIMIT = -1L; + public static CompletableFuture<ManagedCursor> openCursor(ManagedLedger ml, String cursorName) { final var future = new CompletableFuture<ManagedCursor>(); ml.asyncOpenCursor(cursorName, new AsyncCallbacks.OpenCursorCallback() { @@ -58,8 +62,13 @@ public class ManagedLedgerUtils { public static CompletableFuture<List<Entry>> readEntries(ManagedCursor cursor, int numberOfEntriesToRead, Position maxPosition) { + return readEntries(cursor, numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, maxPosition); + } + + public static CompletableFuture<List<Entry>> readEntries(ManagedCursor cursor, int numberOfEntriesToRead, + long maxBytes, Position maxPosition) { final var future = new CompletableFuture<List<Entry>>(); - cursor.asyncReadEntries(numberOfEntriesToRead, new AsyncCallbacks.ReadEntriesCallback() { + cursor.asyncReadEntries(numberOfEntriesToRead, maxBytes, new AsyncCallbacks.ReadEntriesCallback() { @Override public void readEntriesComplete(List<Entry> entries, Object ctx) { future.complete(entries); @@ -73,6 +82,24 @@ public class ManagedLedgerUtils { return future; } + public static CompletableFuture<List<Entry>> readEntriesWithSkipOrWait( + ManagedCursor cursor, int maxEntries, long maxSizeBytes, PositionImpl maxPosition, + @Nullable Predicate<PositionImpl> skipCondition) { + final var future = new CompletableFuture<List<Entry>>(); + cursor.asyncReadEntriesWithSkipOrWait(maxEntries, maxSizeBytes, new AsyncCallbacks.ReadEntriesCallback() { + @Override + public void readEntriesComplete(List<Entry> entries, Object ctx) { + future.complete(entries); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, null, maxPosition, skipCondition); + return future; + } + public static CompletableFuture<Void> markDelete(ManagedCursor cursor, Position position, Map<String, Long> properties) { final var future = new CompletableFuture<Void>(); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/ManagedLedgerReplayTaskTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/ManagedLedgerReplayTaskTest.java index 56c827b2166..177b6773058 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/ManagedLedgerReplayTaskTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/ManagedLedgerReplayTaskTest.java @@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; @@ -128,11 +129,11 @@ public class ManagedLedgerReplayTaskTest extends MockedBookKeeperTestCase { } }).when(cursor).hasMoreEntries(); doAnswer(invocation -> { - final var callback = (AsyncCallbacks.ReadEntriesCallback) invocation.getArgument(1); + final var callback = (AsyncCallbacks.ReadEntriesCallback) invocation.getArgument(2); final var entries = List.<Entry>of(EntryImpl.create(1, 1, "msg".getBytes())); callback.readEntriesComplete(entries, null); return null; - }).when(cursor).asyncReadEntries(anyInt(), any(), any(), any()); + }).when(cursor).asyncReadEntries(anyInt(), anyLong(), any(), any(), any()); try { replayTask.replay(cursor, (__, ___) -> { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 3d258c9ec7f..4683b5f6759 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -102,6 +102,7 @@ import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo; +import org.apache.bookkeeper.mledger.util.ManagedLedgerUtils; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.apache.commons.collections.iterators.EmptyIterator; import org.apache.pulsar.common.api.proto.CommandSubscribe; @@ -5317,5 +5318,27 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { }).toList(), IntStream.range(0, 10).mapToObj(i -> "msg-" + i).toList()); } + @Test + public void testConcurrentRead() throws Exception { + final var config = new ManagedLedgerConfig(); + config.setReadEntryTimeoutSeconds(0); + config.setNewEntriesCheckDelayInMillis(1000); + @Cleanup final var ledger = factory.open("testConcurrentRead", config); + final var cursor = ledger.openCursor("cursor"); + final var future1 = ManagedLedgerUtils.readEntriesWithSkipOrWait(cursor, 10, Integer.MAX_VALUE, + PositionImpl.LATEST, null); + final var future2 = ManagedLedgerUtils.readEntriesWithSkipOrWait(cursor, 10, Integer.MAX_VALUE, + PositionImpl.LATEST, null); + assertTrue(future2.isCompletedExceptionally()); + try { + future2.get(); + fail(); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof ManagedLedgerException.ConcurrentWaitCallbackException); + } + ledger.addEntry("msg".getBytes()); + assertEquals(future1.get(2, TimeUnit.SECONDS).get(0).getData(), "msg".getBytes()); + } + private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index bb8843b9397..1b61ac3923e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -18,20 +18,19 @@ */ package org.apache.pulsar.broker.service.persistent; +import static org.apache.bookkeeper.mledger.util.ManagedLedgerUtils.readEntriesWithSkipOrWait; import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS; import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; import com.google.common.annotations.VisibleForTesting; -import io.netty.util.Recycler; import java.util.Iterator; import java.util.List; -import java.util.Objects; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import lombok.Getter; -import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; @@ -54,14 +53,14 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.util.Backoff; import org.apache.pulsar.common.util.Codec; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.compaction.CompactedTopicUtils; import org.apache.pulsar.compaction.Compactor; -import org.apache.pulsar.compaction.TopicCompactionService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcherSingleActiveConsumer - implements Dispatcher, ReadEntriesCallback { + implements Dispatcher { private final AtomicBoolean isRescheduleReadInProgress = new AtomicBoolean(false); protected final PersistentTopic topic; @@ -154,16 +153,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher } } - @Override - public void readEntriesComplete(final List<Entry> entries, Object obj) { - executor.execute(() -> internalReadEntriesComplete(entries, obj)); - } - - private synchronized void internalReadEntriesComplete(final List<Entry> entries, Object obj) { - ReadEntriesCtx readEntriesCtx = (ReadEntriesCtx) obj; - Consumer readConsumer = readEntriesCtx.getConsumer(); - long epoch = readEntriesCtx.getEpoch(); - readEntriesCtx.recycle(); + private synchronized void readEntriesComplete(List<Entry> entries, Consumer readConsumer, long epoch) { if (log.isDebugEnabled()) { log.debug("[{}-{}] Got messages: {}", name, readConsumer, entries.size()); } @@ -376,19 +366,27 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher log.debug("[{}-{}] Schedule read of {} messages", name, consumer, messagesToRead); } havePendingRead = true; + // TODO: should we pass the consumer epoch for compacted read path? See + // https://github.com/apache/pulsar/issues/13690 + final var epoch = consumer.readCompacted() ? DEFAULT_CONSUMER_EPOCH : consumer.getConsumerEpoch(); + final CompletableFuture<List<Entry>> entriesFuture; if (consumer.readCompacted()) { boolean readFromEarliest = isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId()) && (!cursor.isDurable() || cursor.getName().equals(Compactor.COMPACTION_SUBSCRIPTION) || hasValidMarkDeletePosition(cursor)); - TopicCompactionService topicCompactionService = topic.getTopicCompactionService(); - CompactedTopicUtils.asyncReadCompactedEntries(topicCompactionService, cursor, messagesToRead, - bytesToRead, topic.getMaxReadPosition(), readFromEarliest, this, true, consumer); + entriesFuture = CompactedTopicUtils.asyncReadCompactedEntries(topic.getTopicCompactionService(), + cursor, messagesToRead, bytesToRead, topic.getMaxReadPosition(), readFromEarliest, true); } else { - ReadEntriesCtx readEntriesCtx = - ReadEntriesCtx.create(consumer, consumer.getConsumerEpoch()); - cursor.asyncReadEntriesOrWait(messagesToRead, - bytesToRead, this, readEntriesCtx, topic.getMaxReadPosition()); + entriesFuture = readEntriesWithSkipOrWait(cursor, messagesToRead, bytesToRead, + topic.getMaxReadPosition(), null); } + entriesFuture.whenCompleteAsync((entries, e) -> { + if (e == null) { + readEntriesComplete(entries, consumer, epoch); + } else { + readEntriesFailed(e, consumer); + } + }, executor); } } else { if (log.isDebugEnabled()) { @@ -458,16 +456,10 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher return Pair.of(messagesToRead, bytesToRead); } - @Override - public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - executor.execute(() -> internalReadEntriesFailed(exception, ctx)); - } - - private synchronized void internalReadEntriesFailed(ManagedLedgerException exception, Object ctx) { + @VisibleForTesting + public synchronized void readEntriesFailed(Throwable throwable, Consumer consumer) { havePendingRead = false; - ReadEntriesCtx readEntriesCtx = (ReadEntriesCtx) ctx; - Consumer c = readEntriesCtx.getConsumer(); - readEntriesCtx.recycle(); + final var exception = FutureUtil.unwrapCompletionException(throwable); // Do not keep reading messages from a closed cursor. if (exception instanceof ManagedLedgerException.CursorAlreadyClosedException) { @@ -499,21 +491,19 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher exception.getMessage(), waitTimeMillis / 1000.0); } } else if (!(exception instanceof TooManyRequestsException)) { - log.error("[{}-{}] Error reading entries at {} : {} - Retrying to read in {} seconds", name, c, + log.error("[{}-{}] Error reading entries at {} : {} - Retrying to read in {} seconds", name, consumer, cursor.getReadPosition(), exception.getMessage(), waitTimeMillis / 1000.0); } else { if (log.isDebugEnabled()) { log.debug("[{}-{}] Got throttled by bookies while reading at {} : {} - Retrying to read in {} seconds", - name, c, cursor.getReadPosition(), exception.getMessage(), waitTimeMillis / 1000.0); + name, consumer, cursor.getReadPosition(), exception.getMessage(), waitTimeMillis / 1000.0); } } - Objects.requireNonNull(c); - // Reduce read batch size to avoid flooding bookies with retries readBatchSize = serviceConfig.getDispatcherMinReadBatchSize(); - scheduleReadEntriesWithDelay(c, waitTimeMillis); + scheduleReadEntriesWithDelay(consumer, waitTimeMillis); } @VisibleForTesting @@ -586,44 +576,4 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher } private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class); - - public static class ReadEntriesCtx { - - private Consumer consumer; - private long epoch; - - private final Recycler.Handle<ReadEntriesCtx> recyclerHandle; - - private ReadEntriesCtx(Recycler.Handle<ReadEntriesCtx> recyclerHandle) { - this.recyclerHandle = recyclerHandle; - } - private static final Recycler<ReadEntriesCtx> RECYCLER = - new Recycler<ReadEntriesCtx>() { - @Override - protected ReadEntriesCtx newObject(Recycler.Handle<ReadEntriesCtx> recyclerHandle) { - return new ReadEntriesCtx(recyclerHandle); - } - }; - - public static ReadEntriesCtx create(Consumer consumer, long epoch) { - ReadEntriesCtx readEntriesCtx = RECYCLER.get(); - readEntriesCtx.consumer = consumer; - readEntriesCtx.epoch = epoch; - return readEntriesCtx; - } - - Consumer getConsumer() { - return consumer; - } - - long getEpoch() { - return epoch; - } - - public void recycle() { - consumer = null; - epoch = 0; - recyclerHandle.recycle(this); - } - } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java index 146ba4327d2..9bf3fc38088 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java @@ -23,6 +23,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.service.Consumer; @@ -35,17 +36,19 @@ public interface CompactedTopic { * Read entries from compacted topic. * * @deprecated Use {@link CompactedTopicUtils#asyncReadCompactedEntries(TopicCompactionService, ManagedCursor, - * int, long, org.apache.bookkeeper.mledger.impl.PositionImpl, boolean, ReadEntriesCallback, boolean, Consumer)} + * int, long, org.apache.bookkeeper.mledger.PositionImpl, boolean, boolean)} * instead. */ @Deprecated - void asyncReadEntriesOrWait(ManagedCursor cursor, + default void asyncReadEntriesOrWait(ManagedCursor cursor, int maxEntries, long bytesToRead, PositionImpl maxReadPosition, boolean isFirstRead, ReadEntriesCallback callback, - Consumer consumer); + Consumer consumer) { + callback.readEntriesFailed(new ManagedLedgerException("deprecated"), null); + } CompletableFuture<Entry> readLastEntryOfCompactedLedger(); Optional<Position> getCompactionHorizon(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java index ddc4c08bfdd..33133f3df2a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java @@ -18,14 +18,12 @@ */ package org.apache.pulsar.compaction; -import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ComparisonChain; import io.netty.buffer.ByteBuf; import java.util.ArrayList; -import java.util.Collections; import java.util.Enumeration; import java.util.List; import java.util.NoSuchElementException; @@ -39,17 +37,10 @@ import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; -import org.apache.bookkeeper.mledger.ManagedCursor; -import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.EntryImpl; -import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; -import org.apache.pulsar.broker.service.Consumer; -import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.ReadEntriesCtx; -import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.impl.RawMessageImpl; import org.apache.pulsar.common.api.proto.MessageIdData; @@ -108,80 +99,6 @@ public class CompactedTopicImpl implements CompactedTopic { return tryDeleteCompactedLedger(bk, compactedLedgerId); } - @Override - @Deprecated - public void asyncReadEntriesOrWait(ManagedCursor cursor, - int maxEntries, - long bytesToRead, - PositionImpl maxReadPosition, - boolean isFirstRead, - ReadEntriesCallback callback, Consumer consumer) { - PositionImpl cursorPosition; - boolean readFromEarliest = isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId()) - && (!cursor.isDurable() || cursor.getName().equals(Compactor.COMPACTION_SUBSCRIPTION) - || cursor.getMarkDeletedPosition() == null - || cursor.getMarkDeletedPosition().getEntryId() == -1L); - if (readFromEarliest){ - cursorPosition = PositionImpl.EARLIEST; - } else { - cursorPosition = (PositionImpl) cursor.getReadPosition(); - } - - // TODO: redeliver epoch link https://github.com/apache/pulsar/issues/13690 - ReadEntriesCtx readEntriesCtx = ReadEntriesCtx.create(consumer, DEFAULT_CONSUMER_EPOCH); - - final PositionImpl currentCompactionHorizon = compactionHorizon; - - if (currentCompactionHorizon == null - || currentCompactionHorizon.compareTo(cursorPosition) < 0) { - cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, callback, readEntriesCtx, maxReadPosition); - } else { - ManagedCursorImpl managedCursor = (ManagedCursorImpl) cursor; - int numberOfEntriesToRead = managedCursor.applyMaxSizeCap(maxEntries, bytesToRead); - - compactedTopicContext.thenCompose( - (context) -> findStartPoint(cursorPosition, context.ledger.getLastAddConfirmed(), context.cache) - .thenCompose((startPoint) -> { - // do not need to read the compaction ledger if it is empty. - // the cursor just needs to be set to the compaction horizon - if (startPoint == COMPACT_LEDGER_EMPTY || startPoint == NEWER_THAN_COMPACTED) { - cursor.seek(currentCompactionHorizon.getNext()); - callback.readEntriesComplete(Collections.emptyList(), readEntriesCtx); - return CompletableFuture.completedFuture(null); - } else { - long endPoint = Math.min(context.ledger.getLastAddConfirmed(), - startPoint + (numberOfEntriesToRead - 1)); - return readEntries(context.ledger, startPoint, endPoint) - .thenAccept((entries) -> { - long entriesSize = 0; - for (Entry entry : entries) { - entriesSize += entry.getLength(); - } - managedCursor.updateReadStats(entries.size(), entriesSize); - - Entry lastEntry = entries.get(entries.size() - 1); - // The compaction task depends on the last snapshot and the incremental - // entries to build the new snapshot. So for the compaction cursor, we - // need to force seek the read position to ensure the compactor can read - // the complete last snapshot because of the compactor will read the data - // before the compaction cursor mark delete position - cursor.seek(lastEntry.getPosition().getNext(), true); - callback.readEntriesComplete(entries, readEntriesCtx); - }); - } - })) - .exceptionally((exception) -> { - if (exception.getCause() instanceof NoSuchElementException) { - cursor.seek(currentCompactionHorizon.getNext()); - callback.readEntriesComplete(Collections.emptyList(), readEntriesCtx); - } else { - callback.readEntriesFailed(new ManagedLedgerException(exception), readEntriesCtx); - } - return null; - }); - } - } - static CompletableFuture<Long> findStartPoint(PositionImpl p, long lastEntryId, AsyncLoadingCache<Long, MessageIdData> cache) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java index 098bb248d04..c797593a390 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java @@ -19,36 +19,28 @@ package org.apache.pulsar.compaction; import static com.google.common.base.Preconditions.checkArgument; -import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; +import static org.apache.bookkeeper.mledger.util.ManagedLedgerUtils.readEntries; +import static org.apache.bookkeeper.mledger.util.ManagedLedgerUtils.readEntriesWithSkipOrWait; import com.google.common.annotations.Beta; -import java.util.Collections; +import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; -import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; -import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.collections4.CollectionUtils; -import org.apache.pulsar.broker.service.Consumer; -import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer; -import org.apache.pulsar.common.util.FutureUtil; -import org.jspecify.annotations.Nullable; public class CompactedTopicUtils { @Beta - public static void asyncReadCompactedEntries(TopicCompactionService topicCompactionService, - ManagedCursor cursor, int maxEntries, - long bytesToRead, PositionImpl maxReadPosition, - boolean readFromEarliest, AsyncCallbacks.ReadEntriesCallback callback, - boolean wait, @Nullable Consumer consumer) { + public static CompletableFuture<List<Entry>> asyncReadCompactedEntries( + TopicCompactionService topicCompactionService, ManagedCursor cursor, int maxEntries, + long bytesToRead, PositionImpl maxReadPosition, boolean readFromEarliest, boolean wait) { Objects.requireNonNull(topicCompactionService); Objects.requireNonNull(cursor); checkArgument(maxEntries > 0); - Objects.requireNonNull(callback); final PositionImpl readPosition; if (readFromEarliest) { @@ -57,37 +49,31 @@ public class CompactedTopicUtils { readPosition = (PositionImpl) cursor.getReadPosition(); } - // TODO: redeliver epoch link https://github.com/apache/pulsar/issues/13690 - PersistentDispatcherSingleActiveConsumer.ReadEntriesCtx readEntriesCtx = - PersistentDispatcherSingleActiveConsumer.ReadEntriesCtx.create(consumer, DEFAULT_CONSUMER_EPOCH); - CompletableFuture<Position> lastCompactedPositionFuture = topicCompactionService.getLastCompactedPosition(); - lastCompactedPositionFuture.thenCompose(lastCompactedPosition -> { + return lastCompactedPositionFuture.thenCompose(lastCompactedPosition -> { if (lastCompactedPosition == null || readPosition.compareTo( lastCompactedPosition.getLedgerId(), lastCompactedPosition.getEntryId()) > 0) { if (wait) { - cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, callback, readEntriesCtx, maxReadPosition); + return readEntriesWithSkipOrWait(cursor, maxEntries, bytesToRead, maxReadPosition, null); } else { - cursor.asyncReadEntries(maxEntries, bytesToRead, callback, readEntriesCtx, maxReadPosition); + return readEntries(cursor, maxEntries, bytesToRead, maxReadPosition); } - return CompletableFuture.completedFuture(null); } ManagedCursorImpl managedCursor = (ManagedCursorImpl) cursor; int numberOfEntriesToRead = managedCursor.applyMaxSizeCap(maxEntries, bytesToRead); return topicCompactionService.readCompactedEntries(readPosition, numberOfEntriesToRead) - .thenAccept(entries -> { + .thenApply(entries -> { if (CollectionUtils.isEmpty(entries)) { Position seekToPosition = lastCompactedPosition.getNext(); if (readPosition.compareTo(seekToPosition.getLedgerId(), seekToPosition.getEntryId()) > 0) { seekToPosition = readPosition; } cursor.seek(seekToPosition); - callback.readEntriesComplete(Collections.emptyList(), readEntriesCtx); - return; + return entries; } long entriesSize = 0; @@ -98,12 +84,8 @@ public class CompactedTopicUtils { Entry lastEntry = entries.get(entries.size() - 1); cursor.seek(lastEntry.getPosition().getNext(), true); - callback.readEntriesComplete(entries, readEntriesCtx); + return entries; }); - }).exceptionally((exception) -> { - exception = FutureUtil.unwrapCompletionException(exception); - callback.readEntriesFailed(ManagedLedgerException.getManagedLedgerException(exception), readEntriesCtx); - return null; }); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java index 88fe345334f..c83803d9b9f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerMessageDeduplicationTest.java @@ -20,6 +20,7 @@ package org.apache.pulsar.broker; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -121,7 +122,7 @@ public class BrokerMessageDeduplicationTest { try { doAnswer(invocation -> { throw new RuntimeException("asyncReadEntries failed"); - }).when(cursor).asyncReadEntries(anyInt(), any(), any(), any()); + }).when(cursor).asyncReadEntries(anyInt(), anyLong(), any(), any(), any()); deduplication.checkStatus().get(3, TimeUnit.SECONDS); fail(); } catch (ExecutionException e) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java index a4c9e26ffb8..05b7242723e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java @@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.persistent; import java.util.concurrent.atomic.AtomicInteger; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; @@ -31,6 +32,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.api.proto.CommandSubscribe; +import org.awaitility.Awaitility; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -109,17 +111,18 @@ public class PersistentDispatcherSingleActiveConsumerTest extends ProducerConsum // Mock the readEntriesOrWait(...) to simulate the cursor is closed. Mockito.doAnswer(inv -> { - PersistentDispatcherSingleActiveConsumer dispatcher1 = inv.getArgument(2); - dispatcher1.readEntriesFailed(new ManagedLedgerException.CursorAlreadyClosedException("cursor closed"), + final var callback = (AsyncCallbacks.ReadEntriesCallback) inv.getArgument(2); + callback.readEntriesFailed(new ManagedLedgerException.CursorAlreadyClosedException("cursor closed"), null); return null; - }).when(cursor).asyncReadEntriesOrWait(Mockito.anyInt(), Mockito.anyLong(), Mockito.eq(dispatcher), - Mockito.any(), Mockito.any()); + }).when(cursor).asyncReadEntriesWithSkipOrWait(Mockito.anyInt(), Mockito.anyLong(), Mockito.any(), + Mockito.any(), Mockito.any(), Mockito.any()); dispatcher.readMoreEntries(consumer); // Verify: the readEntriesFailed should be called once and the scheduleReadEntriesWithDelay should not be called. - Assert.assertTrue(callReadEntriesFailed.get() == 1 && callScheduleReadEntriesWithDelayCnt.get() == 0); + Awaitility.await().untilAsserted(() -> Assert.assertTrue(callReadEntriesFailed.get() == 1 + && callScheduleReadEntriesWithDelayCnt.get() == 0)); // Verify: the topic can be deleted successfully. admin.topics().delete(topicName, false); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java index 2545c0362e8..a31ab01d549 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java @@ -69,10 +69,8 @@ public class CompactedTopicUtilsTest { } }; - CompactedTopicUtils.asyncReadCompactedEntries(service, cursor, 1, 100, - PositionImpl.LATEST, false, readEntriesCallback, false, null); - - List<Entry> entries = completableFuture.get(); + final var entries = CompactedTopicUtils.asyncReadCompactedEntries(service, cursor, 1, 100, + PositionImpl.LATEST, false, false).get(); Assert.assertTrue(entries.isEmpty()); Assert.assertNull(throwableRef.get()); Assert.assertEquals(readPositionRef.get(), lastCompactedPosition.getNext());