This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 345626947e4583d58cc0ceb9f6491a7339d6aadf
Author: lipenghui <[email protected]>
AuthorDate: Wed Aug 25 11:27:12 2021 +0800

    Fix the topic in fenced state and can not recover. (#11737)
    
    * Fix the topic in fenced state and can not recover.
    
    Here is the log when the issue happens. The producer continues to reconnect 
to the broker, but the fenced state of the topic is always true.
    ```
    19:01:42.351 [pulsar-io-4-1] INFO  
org.apache.pulsar.broker.service.ServerCnx - 
[/10.24.34.151:48052][persistent://public/default/test-8] Creating producer. 
producerId=8
    19:01:42.352 [Thread-174681] INFO  
org.apache.pulsar.broker.service.ServerCnx - [/10.24.34.151:48052] 
persistent://public/default/test-8 configured with schema false
    19:01:42.352 [Thread-174681] WARN  
org.apache.pulsar.broker.service.AbstractTopic - 
[persistent://public/default/test-8] Attempting to add producer to a fenced 
topic
    19:01:42.352 [Thread-174681] ERROR 
org.apache.pulsar.broker.service.ServerCnx - [/10.24.34.151:48052] Failed to 
add producer to topic persistent://public/default/test-8: producerId=8, 
org.apache.pulsar.broker.service.BrokerServiceException$TopicFencedException: 
Topic is temporarily unavailable
    ```
    
    After check the heap dump of the broker, the `pendingWriteOps` is 5, this 
is the reason why the topic can not recover from the fenced state.
    
    The topic will change to unfenced only the `pendingWriteOps` is 0, details 
can find at 
[PersistentTopic.decrementPendingWriteOpsAndCheck()](https://github.com/apache/pulsar/blob/794aa20d9f2a4e668cc36465362d22e042e6e536/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L463)
    
    But after checking the ML state of the topic, it shows the 
`pendingAddEntries` is 0 which not equals to `pendingWriteOps` of the topic.
    The root cause is we are polling add entry op from the `pendingAddEntries` 
in multiple threads, one is the the ZK callback thread when complete the ledger 
creating 
(https://github.com/apache/pulsar/blob/794aa20d9f2a4e668cc36465362d22e042e6e536/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L1406,
 
https://github.com/apache/pulsar/blob/794aa20d9f2a4e668cc36465362d22e042e6e536/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedL
 [...]
    another one is the ML worker thread when complete the add entry op 
(https://github.com/apache/pulsar/blob/794aa20d9f2a4e668cc36465362d22e042e6e536/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java#L181)
    
    After the ongoing add entry op complete, but the corresponding op might 
been polled by the `clearPendingAddEntries` method. So it will poll another 
one, but due to
    not equals to the current op, the polled op will not get a chance to be 
failed, so that the `pendingWriteOps` will not change to 0.
    
    I have attached the complete logs for the topic:
    
    The fix is to complete the add entry op with ManagedLedgerException if the 
polled op is not equals to the current op.
    
    * Release buffer.
    
    * Revert
    
    (cherry picked from commit 1bcbab04df4055714036afa3ce3bf6cf370869c9)
---
 .../main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index c76d532..665b138 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -137,6 +137,7 @@ public class OpAddEntry extends SafeRunnable implements 
AddCallback, CloseCallba
             ReferenceCountUtil.release(data);
             cb.addFailed(e, ctx);
             ml.mbean.recordAddEntryError();
+            this.recycle();
         }
     }
 
@@ -179,7 +180,13 @@ public class OpAddEntry extends SafeRunnable implements 
AddCallback, CloseCallba
     public void safeRun() {
         // Remove this entry from the head of the pending queue
         OpAddEntry firstInQueue = ml.pendingAddEntries.poll();
-        checkArgument(this == firstInQueue);
+        if (firstInQueue == null) {
+            return;
+        }
+        if (this != firstInQueue) {
+            firstInQueue.failed(new ManagedLedgerException("Unexpected add 
entry op when complete the add entry op."));
+            return;
+        }
 
         ManagedLedgerImpl.NUMBER_OF_ENTRIES_UPDATER.incrementAndGet(ml);
         ManagedLedgerImpl.TOTAL_SIZE_UPDATER.addAndGet(ml, dataLength);

Reply via email to