This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 43ab3c460307f5bc4f909b343281c36604437320 Author: Boyang Jerry Peng <[email protected]> AuthorDate: Wed Jan 20 14:36:48 2021 -0800 Fix issue with topic compaction when compaction ledger is empty (#9206) ### Motivation Exceptions are thrown in the broker when when reading from a compacted topic where all keys are marked as deleted. Clients will not be able to make progress at that point. This can be reproduced in the following manner: 1. Publish x number of key value pairs 2. Publish the same x number of keys but with no value (mark them for deletion) 3. Compact the topic 4. Read compacted from that topic. Exception: `23:11:20.805 [broker-topic-workers-OrderedScheduler-3-0] WARN com.github.benmanes.caffeine.cache.LocalAsyncLoadingCache - Exception thrown during asynchronous load org.apache.bookkeeper.client.BKException$BKReadException: Error while reading ledger at org.apache.bookkeeper.client.BKException.create(BKException.java:62) ~[bookkeeper-server-4.11.0.SPLK.10278328.jar:?] at org.apache.pulsar.compaction.CompactedTopicImpl.lambda$readOneMessageId$9(CompactedTopicImpl.java:181) ~[pulsar-broker.jar:2.7.0] at org.apache.bookkeeper.client.LedgerHandle.asyncReadEntries(LedgerHandle.java:690) ~[bookkeeper-server-4.11.0.SPLK.10278328.jar:?] at org.apache.pulsar.compaction.CompactedTopicImpl.readOneMessageId(CompactedTopicImpl.java:177) ~[pulsar-broker.jar:2.7.0] at org.apache.pulsar.compaction.CompactedTopicImpl.lambda$createCache$8(CompactedTopicImpl.java:168) ~[pulsar-broker.jar:2.7.0] at com.github.benmanes.caffeine.cache.LocalAsyncLoadingCache.lambda$get$2(LocalAsyncLoadingCache.java:129) ~[caffeine-2.6.2.jar:?] at com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2039) ~[caffeine-2.6.2.jar:?] at java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853) ~[?:1.8.0_231] at com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2037) ~[caffeine-2.6.2.jar:?] at com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2020) ~[caffeine-2.6.2.jar:?] at com.github.benmanes.caffeine.cache.LocalAsyncLoadingCache.get(LocalAsyncLoadingCache.java:128) ~[caffeine-2.6.2.jar:?] at com.github.benmanes.caffeine.cache.LocalAsyncLoadingCache.get(LocalAsyncLoadingCache.java:119) ~[caffeine-2.6.2.jar:?] at com.github.benmanes.caffeine.cache.LocalAsyncLoadingCache.get(LocalAsyncLoadingCache.java:158) ~[caffeine-2.6.2.jar:?] at org.apache.pulsar.compaction.CompactedTopicImpl.findStartPointLoop(CompactedTopicImpl.java:144) ~[pulsar-broker.jar:2.7.0] at org.apache.pulsar.compaction.CompactedTopicImpl.findStartPoint(CompactedTopicImpl.java:132) ~[pulsar-broker.jar:2.7.0] at org.apache.pulsar.compaction.CompactedTopicImpl.lambda$asyncReadEntriesOrWait$4(CompactedTopicImpl.java:93) ~[pulsar-broker.jar:2.7.0] at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981) ~[?:1.8.0_231] at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2124) ~[?:1.8.0_231] at org.apache.pulsar.compaction.CompactedTopicImpl.asyncReadEntriesOrWait(CompactedTopicImpl.java:92) ~[pulsar-broker.jar:2.7.0] at org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.readMoreEntries(PersistentDispatcherSingleActiveConsumer.java:483) ~[pulsar-broker.jar:2.7.0] at org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.lambda$null$11(PersistentDispatcherSingleActiveConsumer.java:549) ~[pulsar-broker.jar:2.7.0] at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [managed-ledger.jar:2.7.0] at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [bookkeeper-common-4.11.0.SPLK.10278328.jar:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_231] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_231] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_231] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_231] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_231] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_231] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.51.Final.jar:4.1.51.Final] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231] ` ### Modifications Handle correctly the scenario where compact ledger is empty (cherry picked from commit f0929bee69f99264a829522a3bda9fd7748f0f75) --- .../org/apache/pulsar/compaction/CompactedTopicImpl.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java index 2739269..d30098a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java @@ -50,6 +50,7 @@ import org.slf4j.LoggerFactory; public class CompactedTopicImpl implements CompactedTopic { final static long NEWER_THAN_COMPACTED = -0xfeed0fbaL; + final static long COMPACT_LEDGER_EMPTY = -0xfeed0fbbL; final static int DEFAULT_STARTPOINT_CACHE_SIZE = 100; private final BookKeeper bk; @@ -91,6 +92,13 @@ public class CompactedTopicImpl implements CompactedTopic { compactedTopicContext.thenCompose( (context) -> findStartPoint(cursorPosition, context.ledger.getLastAddConfirmed(), context.cache) .thenCompose((startPoint) -> { + // do not need to read the compaction ledger if it is empty. + // the cursor just needs to be set to the compaction horizon + if (startPoint == COMPACT_LEDGER_EMPTY) { + cursor.seek(compactionHorizon.getNext()); + callback.readEntriesComplete(Collections.emptyList(), ctx); + return CompletableFuture.completedFuture(null); + } if (startPoint == NEWER_THAN_COMPACTED && compactionHorizon.compareTo(cursorPosition) < 0) { cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, ctx); return CompletableFuture.completedFuture(null); @@ -126,7 +134,12 @@ public class CompactedTopicImpl implements CompactedTopic { long lastEntryId, AsyncLoadingCache<Long,MessageIdData> cache) { CompletableFuture<Long> promise = new CompletableFuture<>(); - findStartPointLoop(p, 0, lastEntryId, promise, cache); + // if lastEntryId is less than zero it means there are no entries in the compact ledger + if (lastEntryId < 0) { + promise.complete(COMPACT_LEDGER_EMPTY); + } else { + findStartPointLoop(p, 0, lastEntryId, promise, cache); + } return promise; }
