This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e3c83866af432d6230a391a934d93cf752c05392
Author: baomingyu <[email protected]>
AuthorDate: Thu Sep 30 14:43:06 2021 +0800

    Fix lost message issues 12221 (#12223)
    
    Fixes #12221
    
    *(or if this PR is one task of a github issue, please add `Master Issue: 
#<xyz>` to link to the master issue.)*
    
    Master Issue: #12221
    
    ### Motivation
    There  are three way to create new ledger when one ledger is full.
    first: after add last entry for full ledger, it will handle create ledger 
processing.
    second: check ledger is or not full, when ledger is full and 
appendingAddEntries is not empty, it will handle create ledger processing.
    third: check ledger is or not full, when ledger is full, it will handle 
create ledger processing.
    
     So we need check it is or not creating or has created when concurrent 
create new ledger. avoid to create three new ledgers one  time.
    
    (cherry picked from commit eb9d9d4484964c0f786ef77318e938b7194272ce)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java      | 17 ++++++++++++-----
 1 file changed, 12 insertions(+), 5 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 3419578..4cb554c 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1584,6 +1584,12 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
         if (!pendingAddEntries.isEmpty()) {
             // Need to create a new ledger to write pending entries
+            createLedgerAfterClosed();
+        }
+    }
+
+    synchronized void createLedgerAfterClosed() {
+        if(isNeededCreateNewLedgerAfterCloseLedger()) {
             log.info("[{}] Creating a new ledger", name);
             STATE_UPDATER.set(this, State.CreatingLedger);
             this.lastLedgerCreationInitiationTimestamp = 
System.currentTimeMillis();
@@ -1592,11 +1598,12 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
         }
     }
 
-    synchronized void createLedgerAfterClosed() {
-        STATE_UPDATER.set(this, State.CreatingLedger);
-        this.lastLedgerCreationInitiationTimestamp = 
System.currentTimeMillis();
-        mbean.startDataLedgerCreateOp();
-        asyncCreateLedger(bookKeeper, config, digestType, this, 
Collections.emptyMap());
+    boolean isNeededCreateNewLedgerAfterCloseLedger() {
+        final State state = STATE_UPDATER.get(this);
+        if (state != State.CreatingLedger && state != State.LedgerOpened) {
+            return true;
+        }
+        return false;
     }
 
     @VisibleForTesting

Reply via email to