This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 20e924b Fix lost message issues 12221 (#12223)
20e924b is described below
commit 20e924b616cd899cdc43d93e45e3917bcb30d4f1
Author: baomingyu <[email protected]>
AuthorDate: Thu Sep 30 14:43:06 2021 +0800
Fix lost message issues 12221 (#12223)
Fixes #12221
*(or if this PR is one task of a github issue, please add `Master Issue:
#<xyz>` to link to the master issue.)*
Master Issue: #12221
### Motivation
There are three way to create new ledger when one ledger is full.
first: after add last entry for full ledger, it will handle create ledger
processing.
second: check ledger is or not full, when ledger is full and
appendingAddEntries is not empty, it will handle create ledger processing.
third: check ledger is or not full, when ledger is full, it will handle
create ledger processing.
So we need check it is or not creating or has created when concurrent
create new ledger. avoid to create three new ledgers one time.
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 17 ++++++++++++-----
1 file changed, 12 insertions(+), 5 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index f2fdb84..0dda242 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1468,6 +1468,12 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
if (!pendingAddEntries.isEmpty()) {
// Need to create a new ledger to write pending entries
+ createLedgerAfterClosed();
+ }
+ }
+
+ synchronized void createLedgerAfterClosed() {
+ if(isNeededCreateNewLedgerAfterCloseLedger()) {
log.info("[{}] Creating a new ledger", name);
STATE_UPDATER.set(this, State.CreatingLedger);
this.lastLedgerCreationInitiationTimestamp =
System.currentTimeMillis();
@@ -1476,11 +1482,12 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
}
}
- synchronized void createLedgerAfterClosed() {
- STATE_UPDATER.set(this, State.CreatingLedger);
- this.lastLedgerCreationInitiationTimestamp =
System.currentTimeMillis();
- mbean.startDataLedgerCreateOp();
- asyncCreateLedger(bookKeeper, config, digestType, this,
Collections.emptyMap());
+ boolean isNeededCreateNewLedgerAfterCloseLedger() {
+ final State state = STATE_UPDATER.get(this);
+ if (state != State.CreatingLedger && state != State.LedgerOpened) {
+ return true;
+ }
+ return false;
}
@Override