This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 963c1426ae8a1f62be4ba1f975dd42edb5cf2b3e Author: Yunze Xu <[email protected]> AuthorDate: Sat Sep 18 20:43:02 2021 +0800 Avoid adding duplicated BrokerEntryMetadata (#12018) ### Motivation When the Pulsar cluster enables broker entry metadata, sometimes there're some corrupted entries. See https://github.com/streamnative/kop/issues/442 for example. It's because the broker entry metadata has been added twice. This bug might be introduced from https://github.com/apache/pulsar/pull/9039 https://github.com/apache/pulsar/blob/9b7c3275c904ac1e6a8ef67487a10a0506bb2c58/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L1516-L1518 It happened during a managed ledger's rollover while there're some pending `OpAddEntry`s in `updateLedgersIdsComplete`, only the ledger id should be updated and the data of `OpAddEntry` should not be modified. ### Modifications Only call `beforeAddEntry` for once at the beginning of `internalAsyncAddEntry`. (cherry picked from commit 9d44617a204f4aba4453659f76aded804a04cf6d) --- .../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 5892194..2a7f659 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -717,6 +717,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { + if (!beforeAddEntry(addOperation)) { + return; + } pendingAddEntries.add(addOperation); final State state = STATE_UPDATER.get(this); if (state == State.Fenced) { @@ -779,10 +782,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { addOperation.setCloseWhenDone(true); STATE_UPDATER.set(this, State.ClosingLedger); } - // interceptor entry before add to bookie - if (beforeAddEntry(addOperation)) { - addOperation.initiate(); - } + addOperation.initiate(); } } @@ -1513,9 +1513,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { ReferenceCountUtil.release(existsOp.data); } existsOp.setLedger(currentLedger); - if (beforeAddEntry(existsOp)) { - pendingAddEntries.add(existsOp); - } + pendingAddEntries.add(existsOp); } } while (existsOp != null && --pendingSize > 0);
