TakaHiR07 opened a new issue, #21551: URL: https://github.com/apache/pulsar/issues/21551
### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Version branch-2.9 (guess master branch also have this problem) ### Minimal reproduce step reproduce step: 1. __change_event policy topic, with compactedLedger 1705537, which has 3 quorum on bookie-1,bookie-2,bookie-3 2. bookie-1,2,3 are shutdown one by one, trigger bookie autoRecovery 3. compactedLedger 1705537 quorum become bookie-4,bookie-5,bookie-6 4. use pulsar-admin api to setMessageTTL, trigger write message to __change_event topic 5. broker log have many errors, show that compaction trigger in BrokerService#checkCompaction() is always failed The error log is as following: ``` 14:49:15.197 [compaction-76-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/__change_events-partition-2][__compaction] Get topic last message Id 14:49:15.197 [broker-topic-workers-OrderedExecutor-1-0] ERROR org.apache.bookkeeper.proto.PerChannelBookieClient - Could not connect to bookie: null/bookie-1, current state CONNECTING : org.apache.bookkeeper.proto.BookieAddressResolver$BookieIdNotResolvedException: Cannot resolve bookieId bookie-1, bookie does not exist or it is not running at org.apache.bookkeeper.client.DefaultBookieAddressResolver.resolve(DefaultBookieAddressResolver.java:67) ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1] at org.apache.bookkeeper.proto.PerChannelBookieClient.connect(PerChannelBookieClient.java:531) ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1] at org.apache.bookkeeper.proto.PerChannelBookieClient.connectIfNeededAndDoOp(PerChannelBookieClient.java:657) ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1] at org.apache.bookkeeper.proto.DefaultPerChannelBookieClientPool.obtain(DefaultPerChannelBookieClientPool.java:121) ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1] at org.apache.bookkeeper.proto.DefaultPerChannelBookieClientPool.obtain(DefaultPerChannelBookieClientPool.java:116) ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1] at org.apache.bookkeeper.proto.BookieClientImpl.readEntry(BookieClientImpl.java:509) ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1] at org.apache.bookkeeper.proto.BookieClientImpl.readEntry(BookieClientImpl.java:495) ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1] at org.apache.bookkeeper.proto.BookieClientImpl.readEntry(BookieClientImpl.java:489) ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1] at org.apache.bookkeeper.client.PendingReadOp.sendReadTo(PendingReadOp.java:576) ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1] at org.apache.bookkeeper.client.PendingReadOp$SequenceReadRequest.sendNextRead(PendingReadOp.java:405) ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1] at org.apache.bookkeeper.client.PendingReadOp$SequenceReadRequest.read(PendingReadOp.java:386) ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1] at org.apache.bookkeeper.client.PendingReadOp.initiate(PendingReadOp.java:530) ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1] at org.apache.bookkeeper.client.PendingReadOp.safeRun(PendingReadOp.java:540) ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1] at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) ~[org.apache.bookkeeper-bookkeeper-common-4.14.1.jar:4.14.1] at org.apache.bookkeeper.client.LedgerHandle.readEntriesInternalAsync(LedgerHandle.java:896) ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1] at org.apache.bookkeeper.client.LedgerHandle.asyncReadEntriesInternal(LedgerHandle.java:800) ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1] at org.apache.bookkeeper.client.LedgerHandle.asyncReadEntries(LedgerHandle.java:694) ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1] at org.apache.pulsar.compaction.CompactedTopicImpl.readOneMessageId(CompactedTopicImpl.java:224) ~[org.apache.pulsar-pulsar-broker-2.9.5.4.jar:2.9.5.4] at org.apache.pulsar.compaction.CompactedTopicImpl.lambda$createCache$7(CompactedTopicImpl.java:215) ~[org.apache.pulsar-pulsar-broker-2.9.5.4.jar:2.9.5.4] at com.github.benmanes.caffeine.cache.LocalAsyncCache.lambda$get$2(LocalAsyncCache.java:92) ~[com.github.ben-manes.caffeine-caffeine-2.9.1.jar:?] at com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2405) ~[com.github.ben-manes.caffeine-caffeine-2.9.1.jar:?] at java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1908) ~[?:?] at com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2403) ~[com.github.ben-manes.caffeine-caffeine-2.9.1.jar:?] at com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2386) ~[com.github.ben-manes.caffeine-caffeine-2.9.1.jar:?] at com.github.benmanes.caffeine.cache.LocalAsyncCache.get(LocalAsyncCache.java:91) ~[com.github.ben-manes.caffeine-caffeine-2.9.1.jar:?] at com.github.benmanes.caffeine.cache.LocalAsyncCache.get(LocalAsyncCache.java:82) ~[com.github.ben-manes.caffeine-caffeine-2.9.1.jar:?] at com.github.benmanes.caffeine.cache.LocalAsyncLoadingCache.get(LocalAsyncLoadingCache.java:79) ~[com.github.ben-manes.caffeine-caffeine-2.9.1.jar:?] at org.apache.pulsar.compaction.CompactedTopicImpl.findStartPointLoop(CompactedTopicImpl.java:189) ~[org.apache.pulsar-pulsar-broker-2.9.5.4.jar:2.9.5.4] at org.apache.pulsar.compaction.CompactedTopicImpl.findStartPoint(CompactedTopicImpl.java:179) ~[org.apache.pulsar-pulsar-broker-2.9.5.4.jar:2.9.5.4] at org.apache.pulsar.compaction.CompactedTopicImpl.lambda$asyncReadEntriesOrWait$3(CompactedTopicImpl.java:106) ~[org.apache.pulsar-pulsar-broker-2.9.5.4.jar:2.9.5.4] at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1106) ~[?:?] at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2235) ~[?:?] at org.apache.pulsar.compaction.CompactedTopicImpl.asyncReadEntriesOrWait(CompactedTopicImpl.java:105) ~[org.apache.pulsar-pulsar-broker-2.9.5.4.jar:2.9.5.4] at org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.readMoreEntries(PersistentDispatcherSingleActiveConsumer.java:343) ~[org.apache.pulsar-pulsar-broker-2.9.5.4.jar:2.9.5.4] at org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.internalConsumerFlow(PersistentDispatcherSingleActiveConsumer.java:280) ~[org.apache.pulsar-pulsar-broker-2.9.5.4.jar:2.9.5.4] at org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.lambda$consumerFlow$5(PersistentDispatcherSingleActiveConsumer.java:256) ~[org.apache.pulsar-pulsar-broker-2.9.5.4.jar:2.9.5.4] at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[org.apache.pulsar-managed-ledger-2.9.5.4.jar:2.9.5.4] at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) ~[org.apache.bookkeeper-bookkeeper-common-4.14.1.jar:4.14.1] at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:204) ~[org.apache.bookkeeper-bookkeeper-common-4.14.1.jar:4.14.1] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final] at java.lang.Thread.run(Thread.java:834) ~[?:?] Caused by: org.apache.bookkeeper.client.BKException$BKBookieHandleNotAvailableException: Bookie handle is not available at org.apache.bookkeeper.discover.ZKRegistrationClient.getBookieServiceInfo(ZKRegistrationClient.java:248) ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1] at org.apache.bookkeeper.client.DefaultBookieAddressResolver.resolve(DefaultBookieAddressResolver.java:47) ~[org.apache.bookkeeper-bookkeeper-server-4.14.1.jar:4.14.1] ... 42 more 14:49:15.198 [BookKeeperClientWorker-OrderedExecutor-1-0] ERROR org.apache.bookkeeper.client.PendingReadOp - Read of ledger entry failed: L1705537 E0-E0, Sent to [bookie-1, bookie-2, bookie-3], Heard from [] : bitset = {}, Error = 'Bookie handle is not available'. First unread entry is (-1, rc = null) 14:49:15.199 [broker-topic-workers-OrderedExecutor-1-0] ERROR org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - [persistent://public/default/__change_events-partition-2 / __compaction-Consumer{subscription=CompactorSubscription{topic=persistent://public/default/__change_events-partition-2, name=__compaction}, consumerId=1011, consumerName=1bdee, address=/xxx}] Error reading entries at 1705285:48 : java.util.concurrent.CompletionException: org.apache.bookkeeper.client.BKException$BKBookieHandleNotAvailableException: Bookie handle is not available - Retrying to read in 54.759 seconds 14:49:15.199 [pulsar-io-32-26] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/__change_events-partition-2][__compaction] Successfully getLastMessageId 1705285:48 14:49:15.199 [pulsar-io-32-26] INFO org.apache.pulsar.compaction.TwoPhaseCompactor - Commencing phase one of compaction for persistent://public/default/__change_events-partition-2, reading to 1705285:48:2 ``` We found that the reason is reading compacted ledger 1705537 failed in the process of TwoPhaseCompaction. When do TwoPhaseCompaction. The step is : 1. judge topic has message by reader.hasMessageAvailableAsync() 2. do phaseOne compact. During the process it would getLastMessageId and trigger phaseOneLoop() 3. do phaseTwo compact. create new compactedLedger and compact all keys to compactedLedger. trigger phaseTwoSeekThenLoop() In the phaseOneLoop(), it would try to read message, trigger CompactedTopicImpl#asyncReadEntriesOrWait(). And then it would use compactedTopicContext to findStartPoint. compactedTopicContext is a local variable, containing ReadOnlyLedgerHandle and AsyncLoadingCache. compactedTopicContext is updated in the previous compaction, and then it would not change until the next compaction finish. Previous compaction make compactedTopicContext, 1705537 LedgerHandle's metadata is [bookie-1, bookie-2, bookie-3]. However, after bookie shutdown and do bookie autoRecovery, the 3 quorum on 1705537 has been changed to [bookie-4, bookie-5, bookie-6]. But the LedgerHandle's metadata in broker is not change. So it still try to read message of 1705537 from bookie-1,2,3 and fail, causing the compaction process can not success. The relevant code is: https://github.com/apache/pulsar/blob/51202a6889582117bf790e9ff2325b9f3119510f/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java#L76-L209 https://github.com/apache/pulsar/blob/51202a6889582117bf790e9ff2325b9f3119510f/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java#L74-L199 We temporarily fix the problem by restart all the broker. Because after broker restart, the previous compactedTopicContext is not exist, compaction can succeed. But this is not a user-friendly way since each time bookie shutdown, we may need to restart broker. We‘d better fix this issue. After diving to the code, we found that there are two way to open ReadOnlyLedgerHandle in bookie, asyncOpenLedgerNoRecovery and asyncOpenLedger. - asyncOpenLedger would try to open ledger and if the ledger‘s metadata‘s state is OPEN, it may try to recover. But it would not register the metadata zk node update listener. - Only in asyncOpenLedgerNoRecovery would watch the zk metadata and update the local metadata. We can replace asyncOpenLedger to asyncOpenLedgerNoRecovery. I guess the compactedLedger would not in open state, and it need to update metadata. ### What did you expect to see? compaction can succeed after compactedLedger 3 quorum is autoRecover ### What did you see instead? compaction keep failed after compactedLedger 3 quorum is autoRecover ### Anything else? _No response_ ### Are you willing to submit a PR? - [X] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
