jerrypeng opened a new pull request #9206:
URL: https://github.com/apache/pulsar/pull/9206
### Motivation
Exceptions are thrown in the broker when when reading from a compacted topic
where all keys are marked as deleted. Clients will not be able to make
progress at that point. This can be reproduced in the following manner:
1. Publish x number of key value pairs
2. Publish the same x number of keys but with no value (mark them for
deletion)
3. Compact the topic
4. Read compacted from that topic.
Exception:
`23:11:20.805 [broker-topic-workers-OrderedScheduler-3-0] WARN
com.github.benmanes.caffeine.cache.LocalAsyncLoadingCache - Exception thrown
during asynchronous load
org.apache.bookkeeper.client.BKException$BKReadException: Error while
reading ledger
at org.apache.bookkeeper.client.BKException.create(BKException.java:62)
~[bookkeeper-server-4.11.0.SPLK.10278328.jar:?]
at
org.apache.pulsar.compaction.CompactedTopicImpl.lambda$readOneMessageId$9(CompactedTopicImpl.java:181)
~[pulsar-broker.jar:2.7.0]
at
org.apache.bookkeeper.client.LedgerHandle.asyncReadEntries(LedgerHandle.java:690)
~[bookkeeper-server-4.11.0.SPLK.10278328.jar:?]
at
org.apache.pulsar.compaction.CompactedTopicImpl.readOneMessageId(CompactedTopicImpl.java:177)
~[pulsar-broker.jar:2.7.0]
at
org.apache.pulsar.compaction.CompactedTopicImpl.lambda$createCache$8(CompactedTopicImpl.java:168)
~[pulsar-broker.jar:2.7.0]
at
com.github.benmanes.caffeine.cache.LocalAsyncLoadingCache.lambda$get$2(LocalAsyncLoadingCache.java:129)
~[caffeine-2.6.2.jar:?]
at
com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2039)
~[caffeine-2.6.2.jar:?]
at
java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853)
~[?:1.8.0_231]
at
com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2037)
~[caffeine-2.6.2.jar:?]
at
com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2020)
~[caffeine-2.6.2.jar:?]
at
com.github.benmanes.caffeine.cache.LocalAsyncLoadingCache.get(LocalAsyncLoadingCache.java:128)
~[caffeine-2.6.2.jar:?]
at
com.github.benmanes.caffeine.cache.LocalAsyncLoadingCache.get(LocalAsyncLoadingCache.java:119)
~[caffeine-2.6.2.jar:?]
at
com.github.benmanes.caffeine.cache.LocalAsyncLoadingCache.get(LocalAsyncLoadingCache.java:158)
~[caffeine-2.6.2.jar:?]
at
org.apache.pulsar.compaction.CompactedTopicImpl.findStartPointLoop(CompactedTopicImpl.java:144)
~[pulsar-broker.jar:2.7.0]
at
org.apache.pulsar.compaction.CompactedTopicImpl.findStartPoint(CompactedTopicImpl.java:132)
~[pulsar-broker.jar:2.7.0]
at
org.apache.pulsar.compaction.CompactedTopicImpl.lambda$asyncReadEntriesOrWait$4(CompactedTopicImpl.java:93)
~[pulsar-broker.jar:2.7.0]
at
java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981)
~[?:1.8.0_231]
at
java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2124)
~[?:1.8.0_231]
at
org.apache.pulsar.compaction.CompactedTopicImpl.asyncReadEntriesOrWait(CompactedTopicImpl.java:92)
~[pulsar-broker.jar:2.7.0]
at
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.readMoreEntries(PersistentDispatcherSingleActiveConsumer.java:483)
~[pulsar-broker.jar:2.7.0]
at
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.lambda$null$11(PersistentDispatcherSingleActiveConsumer.java:549)
~[pulsar-broker.jar:2.7.0]
at
org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
[managed-ledger.jar:2.7.0]
at
org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
[bookkeeper-common-4.11.0.SPLK.10278328.jar:?]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_231]
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[?:1.8.0_231]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
[?:1.8.0_231]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
[?:1.8.0_231]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_231]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_231]
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
[netty-common-4.1.51.Final.jar:4.1.51.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]
`
### Modifications
Handle correctly the scenario where compact ledger is empty
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]