This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 20e924b  Fix lost message issues 12221 (#12223)
20e924b is described below

commit 20e924b616cd899cdc43d93e45e3917bcb30d4f1
Author: baomingyu <[email protected]>
AuthorDate: Thu Sep 30 14:43:06 2021 +0800

    Fix lost message issues 12221 (#12223)
    
    Fixes #12221
    
    *(or if this PR is one task of a github issue, please add `Master Issue: 
#<xyz>` to link to the master issue.)*
    
    Master Issue: #12221
    
    ### Motivation
    There  are three way to create new ledger when one ledger is full.
    first: after add last entry for full ledger, it will handle create ledger 
processing.
    second: check ledger is or not full, when ledger is full and 
appendingAddEntries is not empty, it will handle create ledger processing.
    third: check ledger is or not full, when ledger is full, it will handle 
create ledger processing.
    
     So we need check it is or not creating or has created when concurrent 
create new ledger. avoid to create three new ledgers one  time.
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java      | 17 ++++++++++++-----
 1 file changed, 12 insertions(+), 5 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index f2fdb84..0dda242 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1468,6 +1468,12 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
         if (!pendingAddEntries.isEmpty()) {
             // Need to create a new ledger to write pending entries
+            createLedgerAfterClosed();
+        }
+    }
+
+    synchronized void createLedgerAfterClosed() {
+        if(isNeededCreateNewLedgerAfterCloseLedger()) {
             log.info("[{}] Creating a new ledger", name);
             STATE_UPDATER.set(this, State.CreatingLedger);
             this.lastLedgerCreationInitiationTimestamp = 
System.currentTimeMillis();
@@ -1476,11 +1482,12 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
         }
     }
 
-    synchronized void createLedgerAfterClosed() {
-        STATE_UPDATER.set(this, State.CreatingLedger);
-        this.lastLedgerCreationInitiationTimestamp = 
System.currentTimeMillis();
-        mbean.startDataLedgerCreateOp();
-        asyncCreateLedger(bookKeeper, config, digestType, this, 
Collections.emptyMap());
+    boolean isNeededCreateNewLedgerAfterCloseLedger() {
+        final State state = STATE_UPDATER.get(this);
+        if (state != State.CreatingLedger && state != State.LedgerOpened) {
+            return true;
+        }
+        return false;
     }
 
     @Override

Reply via email to