jerrypeng opened a new pull request #9206:
URL: https://github.com/apache/pulsar/pull/9206


   ### Motivation
   
   Exceptions are thrown in the broker when when reading from a compacted topic 
where all keys are marked as deleted.  Clients will not be able to make 
progress at that point. This can be reproduced in the following manner:
   
   1. Publish x number of key value pairs
   2. Publish the same x number of keys but with no value (mark them for 
deletion)
   3. Compact the topic
   4. Read compacted from that topic.
   
   Exception:
   
   `23:11:20.805 [broker-topic-workers-OrderedScheduler-3-0] WARN  
com.github.benmanes.caffeine.cache.LocalAsyncLoadingCache - Exception thrown 
during asynchronous load
   org.apache.bookkeeper.client.BKException$BKReadException: Error while 
reading ledger
        at org.apache.bookkeeper.client.BKException.create(BKException.java:62) 
~[bookkeeper-server-4.11.0.SPLK.10278328.jar:?]
        at 
org.apache.pulsar.compaction.CompactedTopicImpl.lambda$readOneMessageId$9(CompactedTopicImpl.java:181)
 ~[pulsar-broker.jar:2.7.0]
        at 
org.apache.bookkeeper.client.LedgerHandle.asyncReadEntries(LedgerHandle.java:690)
 ~[bookkeeper-server-4.11.0.SPLK.10278328.jar:?]
        at 
org.apache.pulsar.compaction.CompactedTopicImpl.readOneMessageId(CompactedTopicImpl.java:177)
 ~[pulsar-broker.jar:2.7.0]
        at 
org.apache.pulsar.compaction.CompactedTopicImpl.lambda$createCache$8(CompactedTopicImpl.java:168)
 ~[pulsar-broker.jar:2.7.0]
        at 
com.github.benmanes.caffeine.cache.LocalAsyncLoadingCache.lambda$get$2(LocalAsyncLoadingCache.java:129)
 ~[caffeine-2.6.2.jar:?]
        at 
com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2039)
 ~[caffeine-2.6.2.jar:?]
        at 
java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853) 
~[?:1.8.0_231]
        at 
com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2037)
 ~[caffeine-2.6.2.jar:?]
        at 
com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2020)
 ~[caffeine-2.6.2.jar:?]
        at 
com.github.benmanes.caffeine.cache.LocalAsyncLoadingCache.get(LocalAsyncLoadingCache.java:128)
 ~[caffeine-2.6.2.jar:?]
        at 
com.github.benmanes.caffeine.cache.LocalAsyncLoadingCache.get(LocalAsyncLoadingCache.java:119)
 ~[caffeine-2.6.2.jar:?]
        at 
com.github.benmanes.caffeine.cache.LocalAsyncLoadingCache.get(LocalAsyncLoadingCache.java:158)
 ~[caffeine-2.6.2.jar:?]
        at 
org.apache.pulsar.compaction.CompactedTopicImpl.findStartPointLoop(CompactedTopicImpl.java:144)
 ~[pulsar-broker.jar:2.7.0]
        at 
org.apache.pulsar.compaction.CompactedTopicImpl.findStartPoint(CompactedTopicImpl.java:132)
 ~[pulsar-broker.jar:2.7.0]
        at 
org.apache.pulsar.compaction.CompactedTopicImpl.lambda$asyncReadEntriesOrWait$4(CompactedTopicImpl.java:93)
 ~[pulsar-broker.jar:2.7.0]
        at 
java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981)
 ~[?:1.8.0_231]
        at 
java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2124) 
~[?:1.8.0_231]
        at 
org.apache.pulsar.compaction.CompactedTopicImpl.asyncReadEntriesOrWait(CompactedTopicImpl.java:92)
 ~[pulsar-broker.jar:2.7.0]
        at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.readMoreEntries(PersistentDispatcherSingleActiveConsumer.java:483)
 ~[pulsar-broker.jar:2.7.0]
        at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.lambda$null$11(PersistentDispatcherSingleActiveConsumer.java:549)
 ~[pulsar-broker.jar:2.7.0]
        at 
org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) 
[managed-ledger.jar:2.7.0]
        at 
org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) 
[bookkeeper-common-4.11.0.SPLK.10278328.jar:?]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_231]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
[?:1.8.0_231]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 [?:1.8.0_231]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 [?:1.8.0_231]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_231]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_231]
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 [netty-common-4.1.51.Final.jar:4.1.51.Final]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]
   `
   
   ### Modifications
   
   Handle correctly the scenario where compact ledger is empty
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to