This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7296274  Fix publish callback's entry data is null during ledger 
rollover (#10467)
7296274 is described below

commit 72962742aecf953fffabfdcfacbdcbb70a1ebba2
Author: Yunze Xu <[email protected]>
AuthorDate: Tue May 4 02:12:34 2021 +0800

    Fix publish callback's entry data is null during ledger rollover (#10467)
    
    * Fix publish callback's entry data is null during ledger rollover
    
    * Don't call recycle() when the callback is null
---
 .../org/apache/bookkeeper/mledger/impl/OpAddEntry.java  | 10 ++++++----
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java      | 17 +++++++++++------
 2 files changed, 17 insertions(+), 10 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index fc3054e..e30c578 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -197,17 +197,17 @@ public class OpAddEntry extends SafeRunnable implements 
AddCallback, CloseCallba
         ml.lastConfirmedEntry = lastEntry;
 
         if (closeWhenDone) {
-            ReferenceCountUtil.release(data);
             log.info("[{}] Closing ledger {} for being full", ml.getName(), 
ledger.getId());
+            // `data` will be released in `closeComplete`
             ledger.asyncClose(this, ctx);
         } else {
             updateLatency();
             AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
             if (cb != null) {
                 cb.addComplete(lastEntry, data.asReadOnly(), ctx);
-                ReferenceCountUtil.release(data);
                 ml.notifyCursors();
                 ml.notifyWaitingEntryCallBacks();
+                ReferenceCountUtil.release(data);
                 this.recycle();
             } else {
                 ReferenceCountUtil.release(data);
@@ -231,10 +231,13 @@ public class OpAddEntry extends SafeRunnable implements 
AddCallback, CloseCallba
 
         AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
         if (cb != null) {
-            cb.addComplete(PositionImpl.get(lh.getId(), entryId), null, ctx);
+            cb.addComplete(PositionImpl.get(lh.getId(), entryId), 
data.asReadOnly(), ctx);
             ml.notifyCursors();
             ml.notifyWaitingEntryCallBacks();
+            ReferenceCountUtil.release(data);
             this.recycle();
+        } else {
+            ReferenceCountUtil.release(data);
         }
     }
 
@@ -345,5 +348,4 @@ public class OpAddEntry extends SafeRunnable implements 
AddCallback, CloseCallba
                 ", dataLength=" + dataLength +
                 '}';
     }
-
 }
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 8018982..9bca663 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -552,13 +552,15 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
 
     @Test(timeOut = 20000)
     public void asyncAddEntryWithoutError() throws Exception {
-        ManagedLedger ledger = factory.open("my_test_ledger");
+        ManagedLedger ledger = factory.open("my_test_ledger",
+                new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
         ledger.openCursor("test-cursor");
 
-        final CountDownLatch counter = new CountDownLatch(1);
+        final int count = 4;
+        final CountDownLatch counter = new CountDownLatch(count);
 
         final byte[] bytes = "dummy-entry-1".getBytes(Encoding);
-        ledger.asyncAddEntry(bytes, new AddEntryCallback() {
+        AddEntryCallback callback = new AddEntryCallback() {
             @Override
             public void addComplete(Position position, ByteBuf entryData, 
Object ctx) {
                 assertNull(ctx);
@@ -579,11 +581,14 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
                 fail(exception.getMessage());
             }
 
-        }, null);
+        };
+        for (int i = 0; i < count; i++) {
+            ledger.asyncAddEntry(bytes, callback, null);
+        }
 
         counter.await();
-        assertEquals(ledger.getNumberOfEntries(), 1);
-        assertEquals(ledger.getTotalSize(), 
"dummy-entry-1".getBytes(Encoding).length);
+        assertEquals(ledger.getNumberOfEntries(), count);
+        assertEquals(ledger.getTotalSize(), 
"dummy-entry-1".getBytes(Encoding).length * count);
     }
 
     @Test(timeOut = 20000)

Reply via email to