This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 963c1426ae8a1f62be4ba1f975dd42edb5cf2b3e
Author: Yunze Xu <[email protected]>
AuthorDate: Sat Sep 18 20:43:02 2021 +0800

    Avoid adding duplicated BrokerEntryMetadata (#12018)
    
    ### Motivation
    
    When the Pulsar cluster enables broker entry metadata, sometimes there're 
some corrupted entries. See https://github.com/streamnative/kop/issues/442 for 
example. It's because the broker entry metadata has been added twice.
    
    This bug might be introduced from 
https://github.com/apache/pulsar/pull/9039 
https://github.com/apache/pulsar/blob/9b7c3275c904ac1e6a8ef67487a10a0506bb2c58/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L1516-L1518
    
    It happened during a managed ledger's rollover while there're some pending 
`OpAddEntry`s in `updateLedgersIdsComplete`, only the ledger id should be 
updated and the data of `OpAddEntry` should not be modified.
    
    ### Modifications
    
    Only call `beforeAddEntry` for once at the beginning of 
`internalAsyncAddEntry`.
    
    (cherry picked from commit 9d44617a204f4aba4453659f76aded804a04cf6d)
---
 .../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java    | 12 +++++-------
 1 file changed, 5 insertions(+), 7 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 5892194..2a7f659 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -717,6 +717,9 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     }
 
     private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
+        if (!beforeAddEntry(addOperation)) {
+            return;
+        }
         pendingAddEntries.add(addOperation);
         final State state = STATE_UPDATER.get(this);
         if (state == State.Fenced) {
@@ -779,10 +782,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 addOperation.setCloseWhenDone(true);
                 STATE_UPDATER.set(this, State.ClosingLedger);
             }
-            // interceptor entry before add to bookie
-            if (beforeAddEntry(addOperation)) {
-                addOperation.initiate();
-            }
+            addOperation.initiate();
         }
     }
 
@@ -1513,9 +1513,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                     ReferenceCountUtil.release(existsOp.data);
                 }
                 existsOp.setLedger(currentLedger);
-                if (beforeAddEntry(existsOp)) {
-                    pendingAddEntries.add(existsOp);
-                }
+                pendingAddEntries.add(existsOp);
             }
         } while (existsOp != null && --pendingSize > 0);
 

Reply via email to