This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 43ab3c460307f5bc4f909b343281c36604437320
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Wed Jan 20 14:36:48 2021 -0800

    Fix issue with topic compaction when compaction ledger is empty (#9206)
    
    ### Motivation
    
    Exceptions are thrown in the broker when when reading from a compacted 
topic where all keys are marked as deleted.  Clients will not be able to make 
progress at that point. This can be reproduced in the following manner:
    
    1. Publish x number of key value pairs
    2. Publish the same x number of keys but with no value (mark them for 
deletion)
    3. Compact the topic
    4. Read compacted from that topic.
    
    Exception:
    
    `23:11:20.805 [broker-topic-workers-OrderedScheduler-3-0] WARN  
com.github.benmanes.caffeine.cache.LocalAsyncLoadingCache - Exception thrown 
during asynchronous load
    org.apache.bookkeeper.client.BKException$BKReadException: Error while 
reading ledger
        at org.apache.bookkeeper.client.BKException.create(BKException.java:62) 
~[bookkeeper-server-4.11.0.SPLK.10278328.jar:?]
        at 
org.apache.pulsar.compaction.CompactedTopicImpl.lambda$readOneMessageId$9(CompactedTopicImpl.java:181)
 ~[pulsar-broker.jar:2.7.0]
        at 
org.apache.bookkeeper.client.LedgerHandle.asyncReadEntries(LedgerHandle.java:690)
 ~[bookkeeper-server-4.11.0.SPLK.10278328.jar:?]
        at 
org.apache.pulsar.compaction.CompactedTopicImpl.readOneMessageId(CompactedTopicImpl.java:177)
 ~[pulsar-broker.jar:2.7.0]
        at 
org.apache.pulsar.compaction.CompactedTopicImpl.lambda$createCache$8(CompactedTopicImpl.java:168)
 ~[pulsar-broker.jar:2.7.0]
        at 
com.github.benmanes.caffeine.cache.LocalAsyncLoadingCache.lambda$get$2(LocalAsyncLoadingCache.java:129)
 ~[caffeine-2.6.2.jar:?]
        at 
com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2039)
 ~[caffeine-2.6.2.jar:?]
        at 
java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853) 
~[?:1.8.0_231]
        at 
com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2037)
 ~[caffeine-2.6.2.jar:?]
        at 
com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2020)
 ~[caffeine-2.6.2.jar:?]
        at 
com.github.benmanes.caffeine.cache.LocalAsyncLoadingCache.get(LocalAsyncLoadingCache.java:128)
 ~[caffeine-2.6.2.jar:?]
        at 
com.github.benmanes.caffeine.cache.LocalAsyncLoadingCache.get(LocalAsyncLoadingCache.java:119)
 ~[caffeine-2.6.2.jar:?]
        at 
com.github.benmanes.caffeine.cache.LocalAsyncLoadingCache.get(LocalAsyncLoadingCache.java:158)
 ~[caffeine-2.6.2.jar:?]
        at 
org.apache.pulsar.compaction.CompactedTopicImpl.findStartPointLoop(CompactedTopicImpl.java:144)
 ~[pulsar-broker.jar:2.7.0]
        at 
org.apache.pulsar.compaction.CompactedTopicImpl.findStartPoint(CompactedTopicImpl.java:132)
 ~[pulsar-broker.jar:2.7.0]
        at 
org.apache.pulsar.compaction.CompactedTopicImpl.lambda$asyncReadEntriesOrWait$4(CompactedTopicImpl.java:93)
 ~[pulsar-broker.jar:2.7.0]
        at 
java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981)
 ~[?:1.8.0_231]
        at 
java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2124) 
~[?:1.8.0_231]
        at 
org.apache.pulsar.compaction.CompactedTopicImpl.asyncReadEntriesOrWait(CompactedTopicImpl.java:92)
 ~[pulsar-broker.jar:2.7.0]
        at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.readMoreEntries(PersistentDispatcherSingleActiveConsumer.java:483)
 ~[pulsar-broker.jar:2.7.0]
        at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.lambda$null$11(PersistentDispatcherSingleActiveConsumer.java:549)
 ~[pulsar-broker.jar:2.7.0]
        at 
org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) 
[managed-ledger.jar:2.7.0]
        at 
org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) 
[bookkeeper-common-4.11.0.SPLK.10278328.jar:?]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_231]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
[?:1.8.0_231]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 [?:1.8.0_231]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 [?:1.8.0_231]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_231]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_231]
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 [netty-common-4.1.51.Final.jar:4.1.51.Final]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]
    `
    
    ### Modifications
    
    Handle correctly the scenario where compact ledger is empty
    
    
    
    (cherry picked from commit f0929bee69f99264a829522a3bda9fd7748f0f75)
---
 .../org/apache/pulsar/compaction/CompactedTopicImpl.java  | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 2739269..d30098a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -50,6 +50,7 @@ import org.slf4j.LoggerFactory;
 
 public class CompactedTopicImpl implements CompactedTopic {
     final static long NEWER_THAN_COMPACTED = -0xfeed0fbaL;
+    final static long COMPACT_LEDGER_EMPTY = -0xfeed0fbbL;
     final static int DEFAULT_STARTPOINT_CACHE_SIZE = 100;
 
     private final BookKeeper bk;
@@ -91,6 +92,13 @@ public class CompactedTopicImpl implements CompactedTopic {
                 compactedTopicContext.thenCompose(
                     (context) -> findStartPoint(cursorPosition, 
context.ledger.getLastAddConfirmed(), context.cache)
                         .thenCompose((startPoint) -> {
+                            // do not need to read the compaction ledger if it 
is empty.
+                            // the cursor just needs to be set to the 
compaction horizon
+                            if (startPoint == COMPACT_LEDGER_EMPTY) {
+                                cursor.seek(compactionHorizon.getNext());
+                                
callback.readEntriesComplete(Collections.emptyList(), ctx);
+                                return CompletableFuture.completedFuture(null);
+                            }
                             if (startPoint == NEWER_THAN_COMPACTED && 
compactionHorizon.compareTo(cursorPosition) < 0) {
                                 
cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, ctx);
                                 return CompletableFuture.completedFuture(null);
@@ -126,7 +134,12 @@ public class CompactedTopicImpl implements CompactedTopic {
                                                   long lastEntryId,
                                                   
AsyncLoadingCache<Long,MessageIdData> cache) {
         CompletableFuture<Long> promise = new CompletableFuture<>();
-        findStartPointLoop(p, 0, lastEntryId, promise, cache);
+        // if lastEntryId is less than zero it means there are no entries in 
the compact ledger
+        if (lastEntryId < 0) {
+            promise.complete(COMPACT_LEDGER_EMPTY);
+        } else {
+            findStartPointLoop(p, 0, lastEntryId, promise, cache);
+        }
         return promise;
     }
 

Reply via email to