This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e3c83866af432d6230a391a934d93cf752c05392 Author: baomingyu <[email protected]> AuthorDate: Thu Sep 30 14:43:06 2021 +0800 Fix lost message issues 12221 (#12223) Fixes #12221 *(or if this PR is one task of a github issue, please add `Master Issue: #<xyz>` to link to the master issue.)* Master Issue: #12221 ### Motivation There are three way to create new ledger when one ledger is full. first: after add last entry for full ledger, it will handle create ledger processing. second: check ledger is or not full, when ledger is full and appendingAddEntries is not empty, it will handle create ledger processing. third: check ledger is or not full, when ledger is full, it will handle create ledger processing. So we need check it is or not creating or has created when concurrent create new ledger. avoid to create three new ledgers one time. (cherry picked from commit eb9d9d4484964c0f786ef77318e938b7194272ce) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 3419578..4cb554c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1584,6 +1584,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { if (!pendingAddEntries.isEmpty()) { // Need to create a new ledger to write pending entries + createLedgerAfterClosed(); + } + } + + synchronized void createLedgerAfterClosed() { + if(isNeededCreateNewLedgerAfterCloseLedger()) { log.info("[{}] Creating a new ledger", name); STATE_UPDATER.set(this, State.CreatingLedger); this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis(); @@ -1592,11 +1598,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } } - synchronized void createLedgerAfterClosed() { - STATE_UPDATER.set(this, State.CreatingLedger); - this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis(); - mbean.startDataLedgerCreateOp(); - asyncCreateLedger(bookKeeper, config, digestType, this, Collections.emptyMap()); + boolean isNeededCreateNewLedgerAfterCloseLedger() { + final State state = STATE_UPDATER.get(this); + if (state != State.CreatingLedger && state != State.LedgerOpened) { + return true; + } + return false; } @VisibleForTesting
