This is an automated email from the ASF dual-hosted git repository. chenhang pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 345626947e4583d58cc0ceb9f6491a7339d6aadf Author: lipenghui <[email protected]> AuthorDate: Wed Aug 25 11:27:12 2021 +0800 Fix the topic in fenced state and can not recover. (#11737) * Fix the topic in fenced state and can not recover. Here is the log when the issue happens. The producer continues to reconnect to the broker, but the fenced state of the topic is always true. ``` 19:01:42.351 [pulsar-io-4-1] INFO org.apache.pulsar.broker.service.ServerCnx - [/10.24.34.151:48052][persistent://public/default/test-8] Creating producer. producerId=8 19:01:42.352 [Thread-174681] INFO org.apache.pulsar.broker.service.ServerCnx - [/10.24.34.151:48052] persistent://public/default/test-8 configured with schema false 19:01:42.352 [Thread-174681] WARN org.apache.pulsar.broker.service.AbstractTopic - [persistent://public/default/test-8] Attempting to add producer to a fenced topic 19:01:42.352 [Thread-174681] ERROR org.apache.pulsar.broker.service.ServerCnx - [/10.24.34.151:48052] Failed to add producer to topic persistent://public/default/test-8: producerId=8, org.apache.pulsar.broker.service.BrokerServiceException$TopicFencedException: Topic is temporarily unavailable ``` After check the heap dump of the broker, the `pendingWriteOps` is 5, this is the reason why the topic can not recover from the fenced state. The topic will change to unfenced only the `pendingWriteOps` is 0, details can find at [PersistentTopic.decrementPendingWriteOpsAndCheck()](https://github.com/apache/pulsar/blob/794aa20d9f2a4e668cc36465362d22e042e6e536/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L463) But after checking the ML state of the topic, it shows the `pendingAddEntries` is 0 which not equals to `pendingWriteOps` of the topic. The root cause is we are polling add entry op from the `pendingAddEntries` in multiple threads, one is the the ZK callback thread when complete the ledger creating (https://github.com/apache/pulsar/blob/794aa20d9f2a4e668cc36465362d22e042e6e536/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L1406, https://github.com/apache/pulsar/blob/794aa20d9f2a4e668cc36465362d22e042e6e536/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedL [...] another one is the ML worker thread when complete the add entry op (https://github.com/apache/pulsar/blob/794aa20d9f2a4e668cc36465362d22e042e6e536/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java#L181) After the ongoing add entry op complete, but the corresponding op might been polled by the `clearPendingAddEntries` method. So it will poll another one, but due to not equals to the current op, the polled op will not get a chance to be failed, so that the `pendingWriteOps` will not change to 0. I have attached the complete logs for the topic: The fix is to complete the add entry op with ManagedLedgerException if the polled op is not equals to the current op. * Release buffer. * Revert (cherry picked from commit 1bcbab04df4055714036afa3ce3bf6cf370869c9) --- .../main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index c76d532..665b138 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -137,6 +137,7 @@ public class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallba ReferenceCountUtil.release(data); cb.addFailed(e, ctx); ml.mbean.recordAddEntryError(); + this.recycle(); } } @@ -179,7 +180,13 @@ public class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallba public void safeRun() { // Remove this entry from the head of the pending queue OpAddEntry firstInQueue = ml.pendingAddEntries.poll(); - checkArgument(this == firstInQueue); + if (firstInQueue == null) { + return; + } + if (this != firstInQueue) { + firstInQueue.failed(new ManagedLedgerException("Unexpected add entry op when complete the add entry op.")); + return; + } ManagedLedgerImpl.NUMBER_OF_ENTRIES_UPDATER.incrementAndGet(ml); ManagedLedgerImpl.TOTAL_SIZE_UPDATER.addAndGet(ml, dataLength);
