TakaHiR07 opened a new issue, #21551:
URL: https://github.com/apache/pulsar/issues/21551

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Version
   
   branch-2.9 (guess master branch also have this problem)
   
   ### Minimal reproduce step
   
   reproduce step:
        1. __change_event policy topic, with compactedLedger 1705537, which has 
3 quorum on bookie-1,bookie-2,bookie-3
        2. bookie-1,2,3 are shutdown one by one, trigger bookie autoRecovery
        3. compactedLedger 1705537 quorum become bookie-4,bookie-5,bookie-6
        4. use pulsar-admin api to setMessageTTL, trigger write message to 
__change_event topic
        5. broker log have many errors, show that compaction trigger in 
BrokerService#checkCompaction() is always failed 
   
   The error log is as following:
   ```
   14:49:15.197 [compaction-76-1] INFO  
org.apache.pulsar.client.impl.ConsumerImpl - 
[persistent://public/default/__change_events-partition-2][__compaction] Get 
topic last message Id
                14:49:15.197 [broker-topic-workers-OrderedExecutor-1-0] ERROR 
org.apache.bookkeeper.proto.PerChannelBookieClient - Could not connect to 
bookie: null/bookie-1, current state CONNECTING : 
                
org.apache.bookkeeper.proto.BookieAddressResolver$BookieIdNotResolvedException: 
Cannot resolve bookieId bookie-1, bookie does not exist or it is not running
                        at 
org.apache.bookkeeper.client.DefaultBookieAddressResolver.resolve(DefaultBookieAddressResolver.java:67)
 ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1]
                        at 
org.apache.bookkeeper.proto.PerChannelBookieClient.connect(PerChannelBookieClient.java:531)
 ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1]
                        at 
org.apache.bookkeeper.proto.PerChannelBookieClient.connectIfNeededAndDoOp(PerChannelBookieClient.java:657)
 ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1]
                        at 
org.apache.bookkeeper.proto.DefaultPerChannelBookieClientPool.obtain(DefaultPerChannelBookieClientPool.java:121)
 ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1]
                        at 
org.apache.bookkeeper.proto.DefaultPerChannelBookieClientPool.obtain(DefaultPerChannelBookieClientPool.java:116)
 ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1]
                        at 
org.apache.bookkeeper.proto.BookieClientImpl.readEntry(BookieClientImpl.java:509)
 ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1]
                        at 
org.apache.bookkeeper.proto.BookieClientImpl.readEntry(BookieClientImpl.java:495)
 ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1]
                        at 
org.apache.bookkeeper.proto.BookieClientImpl.readEntry(BookieClientImpl.java:489)
 ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1]
                        at 
org.apache.bookkeeper.client.PendingReadOp.sendReadTo(PendingReadOp.java:576) 
~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1]
                        at 
org.apache.bookkeeper.client.PendingReadOp$SequenceReadRequest.sendNextRead(PendingReadOp.java:405)
 ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1]
                        at 
org.apache.bookkeeper.client.PendingReadOp$SequenceReadRequest.read(PendingReadOp.java:386)
 ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1]
                        at 
org.apache.bookkeeper.client.PendingReadOp.initiate(PendingReadOp.java:530) 
~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1]
                        at 
org.apache.bookkeeper.client.PendingReadOp.safeRun(PendingReadOp.java:540) 
~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1]
                        at 
org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) 
~[org.apache.bookkeeper-bookkeeper-common-4.14.1.jar:4.14.1]
                        at 
org.apache.bookkeeper.client.LedgerHandle.readEntriesInternalAsync(LedgerHandle.java:896)
 ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1]
                        at 
org.apache.bookkeeper.client.LedgerHandle.asyncReadEntriesInternal(LedgerHandle.java:800)
 ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1]
                        at 
org.apache.bookkeeper.client.LedgerHandle.asyncReadEntries(LedgerHandle.java:694)
 ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1]
                        at 
org.apache.pulsar.compaction.CompactedTopicImpl.readOneMessageId(CompactedTopicImpl.java:224)
 ~[org.apache.pulsar-pulsar-broker-2.9.5.4.jar:2.9.5.4]
                        at 
org.apache.pulsar.compaction.CompactedTopicImpl.lambda$createCache$7(CompactedTopicImpl.java:215)
 ~[org.apache.pulsar-pulsar-broker-2.9.5.4.jar:2.9.5.4]
                        at 
com.github.benmanes.caffeine.cache.LocalAsyncCache.lambda$get$2(LocalAsyncCache.java:92)
 ~[com.github.ben-manes.caffeine-caffeine-2.9.1.jar:?]
                        at 
com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2405)
 ~[com.github.ben-manes.caffeine-caffeine-2.9.1.jar:?]
                        at 
java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1908) 
~[?:?]
                        at 
com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2403)
 ~[com.github.ben-manes.caffeine-caffeine-2.9.1.jar:?]
                        at 
com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2386)
 ~[com.github.ben-manes.caffeine-caffeine-2.9.1.jar:?]
                        at 
com.github.benmanes.caffeine.cache.LocalAsyncCache.get(LocalAsyncCache.java:91) 
~[com.github.ben-manes.caffeine-caffeine-2.9.1.jar:?]
                        at 
com.github.benmanes.caffeine.cache.LocalAsyncCache.get(LocalAsyncCache.java:82) 
~[com.github.ben-manes.caffeine-caffeine-2.9.1.jar:?]
                        at 
com.github.benmanes.caffeine.cache.LocalAsyncLoadingCache.get(LocalAsyncLoadingCache.java:79)
 ~[com.github.ben-manes.caffeine-caffeine-2.9.1.jar:?]
                        at 
org.apache.pulsar.compaction.CompactedTopicImpl.findStartPointLoop(CompactedTopicImpl.java:189)
 ~[org.apache.pulsar-pulsar-broker-2.9.5.4.jar:2.9.5.4]
                        at 
org.apache.pulsar.compaction.CompactedTopicImpl.findStartPoint(CompactedTopicImpl.java:179)
 ~[org.apache.pulsar-pulsar-broker-2.9.5.4.jar:2.9.5.4]
                        at 
org.apache.pulsar.compaction.CompactedTopicImpl.lambda$asyncReadEntriesOrWait$3(CompactedTopicImpl.java:106)
 ~[org.apache.pulsar-pulsar-broker-2.9.5.4.jar:2.9.5.4]
                        at 
java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1106)
 ~[?:?]
                        at 
java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2235) 
~[?:?]
                        at 
org.apache.pulsar.compaction.CompactedTopicImpl.asyncReadEntriesOrWait(CompactedTopicImpl.java:105)
 ~[org.apache.pulsar-pulsar-broker-2.9.5.4.jar:2.9.5.4]
                        at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.readMoreEntries(PersistentDispatcherSingleActiveConsumer.java:343)
 ~[org.apache.pulsar-pulsar-broker-2.9.5.4.jar:2.9.5.4]
                        at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.internalConsumerFlow(PersistentDispatcherSingleActiveConsumer.java:280)
 ~[org.apache.pulsar-pulsar-broker-2.9.5.4.jar:2.9.5.4]
                        at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.lambda$consumerFlow$5(PersistentDispatcherSingleActiveConsumer.java:256)
 ~[org.apache.pulsar-pulsar-broker-2.9.5.4.jar:2.9.5.4]
                        at 
org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) 
~[org.apache.pulsar-managed-ledger-2.9.5.4.jar:2.9.5.4]
                        at 
org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) 
~[org.apache.bookkeeper-bookkeeper-common-4.14.1.jar:4.14.1]
                        at 
org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:204)
 ~[org.apache.bookkeeper-bookkeeper-common-4.14.1.jar:4.14.1]
                        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
~[?:?]
                        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
~[?:?]
                        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 ~[io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]
                        at java.lang.Thread.run(Thread.java:834) ~[?:?]
                Caused by: 
org.apache.bookkeeper.client.BKException$BKBookieHandleNotAvailableException: 
Bookie handle is not available
                        at 
org.apache.bookkeeper.discover.ZKRegistrationClient.getBookieServiceInfo(ZKRegistrationClient.java:248)
 ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1]
                        at 
org.apache.bookkeeper.client.DefaultBookieAddressResolver.resolve(DefaultBookieAddressResolver.java:47)
 ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1]
                        ... 42 more
                14:49:15.198 [BookKeeperClientWorker-OrderedExecutor-1-0] ERROR 
org.apache.bookkeeper.client.PendingReadOp - Read of ledger entry failed: 
L1705537 E0-E0, Sent to [bookie-1, bookie-2, bookie-3], Heard from [] : bitset 
= {}, Error = 'Bookie handle is not available'. First unread entry is (-1, rc = 
null)
                14:49:15.199 [broker-topic-workers-OrderedExecutor-1-0] ERROR 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer
 - [persistent://public/default/__change_events-partition-2 / 
__compaction-Consumer{subscription=CompactorSubscription{topic=persistent://public/default/__change_events-partition-2,
 name=__compaction}, consumerId=1011, consumerName=1bdee, address=/xxx}] Error 
reading entries at 1705285:48 : java.util.concurrent.CompletionException: 
org.apache.bookkeeper.client.BKException$BKBookieHandleNotAvailableException: 
Bookie handle is not available - Retrying to read in 54.759 seconds
                14:49:15.199 [pulsar-io-32-26] INFO  
org.apache.pulsar.client.impl.ConsumerImpl - 
[persistent://public/default/__change_events-partition-2][__compaction] 
Successfully getLastMessageId 1705285:48
                14:49:15.199 [pulsar-io-32-26] INFO  
org.apache.pulsar.compaction.TwoPhaseCompactor - Commencing phase one of 
compaction for persistent://public/default/__change_events-partition-2, reading 
to 1705285:48:2
   ```
   
   We found that the reason is reading compacted ledger 1705537 failed in the 
process of TwoPhaseCompaction. 
   
   When do TwoPhaseCompaction. The step is :
   1. judge topic has message by reader.hasMessageAvailableAsync()
   2. do phaseOne compact. During the process it would getLastMessageId and 
trigger phaseOneLoop()
   3. do phaseTwo compact. create new compactedLedger and compact all keys to 
compactedLedger. trigger phaseTwoSeekThenLoop()
   
   In the phaseOneLoop(), it would try to read message, trigger 
CompactedTopicImpl#asyncReadEntriesOrWait(). And then it would use 
compactedTopicContext to findStartPoint. compactedTopicContext is a local 
variable, containing ReadOnlyLedgerHandle and AsyncLoadingCache. 
compactedTopicContext is updated in the previous compaction, and then it would 
not change until the next compaction finish. 
   
   Previous compaction make compactedTopicContext, 1705537 LedgerHandle's 
metadata is [bookie-1, bookie-2, bookie-3]. However, after bookie shutdown and 
do bookie autoRecovery, the 3 quorum on 1705537 has been changed to [bookie-4, 
bookie-5, bookie-6]. But the LedgerHandle's metadata in broker is not change. 
So it still try to read message of 1705537 from bookie-1,2,3 and fail, causing 
the compaction process can not success.
   
   The relevant code is:
   
https://github.com/apache/pulsar/blob/51202a6889582117bf790e9ff2325b9f3119510f/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java#L76-L209
   
   
https://github.com/apache/pulsar/blob/51202a6889582117bf790e9ff2325b9f3119510f/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java#L74-L199
   
   We temporarily fix the problem  by restart all the broker. Because after 
broker restart, the previous compactedTopicContext is not exist, compaction can 
succeed. 
   
   But this is not a user-friendly way since each time bookie shutdown, we may 
need to restart broker. We‘d better fix this issue.
   
   After diving to the code, we found that there are two way to open 
ReadOnlyLedgerHandle in bookie, asyncOpenLedgerNoRecovery and asyncOpenLedger. 
   - asyncOpenLedger would try to open ledger and if the ledger‘s metadata‘s 
state is OPEN, it may try to recover. But it would not register the metadata zk 
node update listener.
   - Only in asyncOpenLedgerNoRecovery would watch the zk metadata and update 
the local metadata.
   
   We can replace asyncOpenLedger to asyncOpenLedgerNoRecovery. I guess the 
compactedLedger would not in open state, and it need to update metadata.
   
   
   
   ### What did you expect to see?
   
   compaction can succeed after compactedLedger 3 quorum is autoRecover
   
   ### What did you see instead?
   
   compaction keep failed after compactedLedger 3 quorum is autoRecover
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [X] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to