This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7296274 Fix publish callback's entry data is null during ledger
rollover (#10467)
7296274 is described below
commit 72962742aecf953fffabfdcfacbdcbb70a1ebba2
Author: Yunze Xu <[email protected]>
AuthorDate: Tue May 4 02:12:34 2021 +0800
Fix publish callback's entry data is null during ledger rollover (#10467)
* Fix publish callback's entry data is null during ledger rollover
* Don't call recycle() when the callback is null
---
.../org/apache/bookkeeper/mledger/impl/OpAddEntry.java | 10 ++++++----
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 17 +++++++++++------
2 files changed, 17 insertions(+), 10 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index fc3054e..e30c578 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -197,17 +197,17 @@ public class OpAddEntry extends SafeRunnable implements
AddCallback, CloseCallba
ml.lastConfirmedEntry = lastEntry;
if (closeWhenDone) {
- ReferenceCountUtil.release(data);
log.info("[{}] Closing ledger {} for being full", ml.getName(),
ledger.getId());
+ // `data` will be released in `closeComplete`
ledger.asyncClose(this, ctx);
} else {
updateLatency();
AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
if (cb != null) {
cb.addComplete(lastEntry, data.asReadOnly(), ctx);
- ReferenceCountUtil.release(data);
ml.notifyCursors();
ml.notifyWaitingEntryCallBacks();
+ ReferenceCountUtil.release(data);
this.recycle();
} else {
ReferenceCountUtil.release(data);
@@ -231,10 +231,13 @@ public class OpAddEntry extends SafeRunnable implements
AddCallback, CloseCallba
AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
if (cb != null) {
- cb.addComplete(PositionImpl.get(lh.getId(), entryId), null, ctx);
+ cb.addComplete(PositionImpl.get(lh.getId(), entryId),
data.asReadOnly(), ctx);
ml.notifyCursors();
ml.notifyWaitingEntryCallBacks();
+ ReferenceCountUtil.release(data);
this.recycle();
+ } else {
+ ReferenceCountUtil.release(data);
}
}
@@ -345,5 +348,4 @@ public class OpAddEntry extends SafeRunnable implements
AddCallback, CloseCallba
", dataLength=" + dataLength +
'}';
}
-
}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 8018982..9bca663 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -552,13 +552,15 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
@Test(timeOut = 20000)
public void asyncAddEntryWithoutError() throws Exception {
- ManagedLedger ledger = factory.open("my_test_ledger");
+ ManagedLedger ledger = factory.open("my_test_ledger",
+ new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
ledger.openCursor("test-cursor");
- final CountDownLatch counter = new CountDownLatch(1);
+ final int count = 4;
+ final CountDownLatch counter = new CountDownLatch(count);
final byte[] bytes = "dummy-entry-1".getBytes(Encoding);
- ledger.asyncAddEntry(bytes, new AddEntryCallback() {
+ AddEntryCallback callback = new AddEntryCallback() {
@Override
public void addComplete(Position position, ByteBuf entryData,
Object ctx) {
assertNull(ctx);
@@ -579,11 +581,14 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
fail(exception.getMessage());
}
- }, null);
+ };
+ for (int i = 0; i < count; i++) {
+ ledger.asyncAddEntry(bytes, callback, null);
+ }
counter.await();
- assertEquals(ledger.getNumberOfEntries(), 1);
- assertEquals(ledger.getTotalSize(),
"dummy-entry-1".getBytes(Encoding).length);
+ assertEquals(ledger.getNumberOfEntries(), count);
+ assertEquals(ledger.getTotalSize(),
"dummy-entry-1".getBytes(Encoding).length * count);
}
@Test(timeOut = 20000)