This is an automated email from the ASF dual-hosted git repository.

guangning pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit eaaf17bfc557493073d592f7325a3c30e924b6b4
Author: lipenghui <[email protected]>
AuthorDate: Thu Jan 9 14:18:01 2020 +0800

    Avoid using same OpAddEntry between different ledger handles (#5942)
    
    ### Motivation
    
    Avoid using same OpAddEntry between different ledger handles.
    
    ### Modifications
    
    Add state for OpAddEntry, if op handled by new ledger handle, the op will 
set to CLOSED state, after the legacy callback happens will check the op state, 
only INITIATED can be processed.
    
    When ledger rollover happens, pendingAddEntries will be processed. when 
process pendingAddEntries, will create a new OpAddEntry by the old OpAddEntry 
to avoid different ledger handles use same OpAddEntry.
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 20 +++++++--
 .../apache/bookkeeper/mledger/impl/OpAddEntry.java | 49 ++++++++++++++++++----
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 38 +++++++++++++++++
 3 files changed, 95 insertions(+), 12 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 3a80386..85850ac 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -553,13 +553,12 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
         // Jump to specific thread to avoid contention from writers writing 
from different threads
         executor.executeOrdered(name, safeRun(() -> {
-            pendingAddEntries.add(addOperation);
-
             internalAsyncAddEntry(addOperation);
         }));
     }
 
     private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
+        pendingAddEntries.add(addOperation);
         final State state = STATE_UPDATER.get(this);
         if (state == State.Fenced) {
             addOperation.failed(new ManagedLedgerFencedException());
@@ -1294,9 +1293,24 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             log.debug("[{}] Resending {} pending messages", name, 
pendingAddEntries.size());
         }
 
+        // Avoid use same OpAddEntry between different ledger handle
+        int pendingSize = pendingAddEntries.size();
+        OpAddEntry existsOp;
+        do {
+            existsOp = pendingAddEntries.poll();
+            if (existsOp != null) {
+                // If op is used by another ledger handle, we need to close it 
and create a new one
+                if (existsOp.ledger != null) {
+                    existsOp.close();
+                    existsOp = OpAddEntry.create(existsOp.ml, existsOp.data, 
existsOp.callback, existsOp.ctx);
+                }
+                existsOp.setLedger(currentLedger);
+                pendingAddEntries.add(existsOp);
+            }
+        } while (existsOp != null && --pendingSize > 0);
+
         // Process all the pending addEntry requests
         for (OpAddEntry op : pendingAddEntries) {
-            op.setLedger(currentLedger);
             ++currentLedgerEntries;
             currentLedgerSize += op.data.readableBytes();
 
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index e882458..6c2e872 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -43,12 +43,14 @@ import org.slf4j.LoggerFactory;
  *
  */
 class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback {
-    private ManagedLedgerImpl ml;
+    protected ManagedLedgerImpl ml;
     LedgerHandle ledger;
     private long entryId;
 
     @SuppressWarnings("unused")
-    private volatile AddEntryCallback callback;
+    private static final AtomicReferenceFieldUpdater<OpAddEntry, 
AddEntryCallback> callbackUpdater =
+            AtomicReferenceFieldUpdater.newUpdater(OpAddEntry.class, 
AddEntryCallback.class, "callback");
+    protected volatile AddEntryCallback callback;
     Object ctx;
     volatile long addOpCount;
     private static final AtomicLongFieldUpdater<OpAddEntry> 
ADD_OP_COUNT_UPDATER = AtomicLongFieldUpdater
@@ -60,8 +62,16 @@ class OpAddEntry extends SafeRunnable implements 
AddCallback, CloseCallback {
     ByteBuf data;
     private int dataLength;
 
-    private static final AtomicReferenceFieldUpdater<OpAddEntry, 
AddEntryCallback> callbackUpdater =
-        AtomicReferenceFieldUpdater.newUpdater(OpAddEntry.class, 
AddEntryCallback.class, "callback");
+    private static final AtomicReferenceFieldUpdater<OpAddEntry, 
OpAddEntry.State> STATE_UPDATER = AtomicReferenceFieldUpdater
+            .newUpdater(OpAddEntry.class, OpAddEntry.State.class, "state");
+    volatile State state;
+
+    enum State {
+        OPEN,
+        INITIATED,
+        COMPLETED,
+        CLOSED
+    }
 
     public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data, 
AddEntryCallback callback, Object ctx) {
         OpAddEntry op = RECYCLER.get();
@@ -75,6 +85,7 @@ class OpAddEntry extends SafeRunnable implements AddCallback, 
CloseCallback {
         op.closeWhenDone = false;
         op.entryId = -1;
         op.startTime = System.nanoTime();
+        op.state = State.OPEN;
         ml.mbean.addAddEntrySample(op.dataLength);
         if (log.isDebugEnabled()) {
             log.debug("Created new OpAddEntry {}", op);
@@ -91,12 +102,16 @@ class OpAddEntry extends SafeRunnable implements 
AddCallback, CloseCallback {
     }
 
     public void initiate() {
-        ByteBuf duplicateBuffer = data.retainedDuplicate();
+        if (STATE_UPDATER.compareAndSet(OpAddEntry.this, State.OPEN, 
State.INITIATED)) {
+            ByteBuf duplicateBuffer = data.retainedDuplicate();
 
-        // internally asyncAddEntry() will take the ownership of the buffer 
and release it at the end
-        addOpCount = 
ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml);;
-        lastInitTime = System.nanoTime();
-        ledger.asyncAddEntry(duplicateBuffer, this, addOpCount);
+            // internally asyncAddEntry() will take the ownership of the 
buffer and release it at the end
+            addOpCount = 
ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml);
+            lastInitTime = System.nanoTime();
+            ledger.asyncAddEntry(duplicateBuffer, this, addOpCount);
+        } else {
+            log.warn("[{}] initiate with unexpected state {}, expect OPEN 
state.", ml.getName(), state);
+        }
     }
 
     public void failed(ManagedLedgerException e) {
@@ -110,6 +125,13 @@ class OpAddEntry extends SafeRunnable implements 
AddCallback, CloseCallback {
 
     @Override
     public void addComplete(int rc, final LedgerHandle lh, long entryId, 
Object ctx) {
+
+        if (!STATE_UPDATER.compareAndSet(OpAddEntry.this, State.INITIATED, 
State.COMPLETED)) {
+            log.warn("[{}] The add op is terminal legacy callback for entry 
{}-{} adding.", ml.getName(), lh.getId(), entryId);
+            OpAddEntry.this.recycle();
+            return;
+        }
+
         if (ledger.getId() != lh.getId()) {
             log.warn("[{}] ledgerId {} doesn't match with acked ledgerId {}", 
ml.getName(), ledger.getId(), lh.getId());
         }
@@ -216,6 +238,7 @@ class OpAddEntry extends SafeRunnable implements 
AddCallback, CloseCallback {
 
     void handleAddTimeoutFailure(final LedgerHandle ledger, Object ctx) {
         if (checkAndCompleteOp(ctx)) {
+            this.close();
             this.handleAddFailure(ledger);
         }
     }
@@ -237,6 +260,14 @@ class OpAddEntry extends SafeRunnable implements 
AddCallback, CloseCallback {
             ml.ledgerClosed(ledger);
         }));
     }
+
+    void close() {
+        STATE_UPDATER.set(OpAddEntry.this, State.CLOSED);
+    }
+
+    public State getState() {
+        return state;
+    }
     
     private final Handle<OpAddEntry> recyclerHandle;
 
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index d94099b..b954f65 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -38,6 +38,7 @@ import com.google.common.base.Charsets;
 import com.google.common.collect.Sets;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.Unpooled;
 
 import java.io.IOException;
@@ -45,6 +46,7 @@ import java.lang.reflect.Field;
 import java.nio.charset.Charset;
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -112,6 +114,7 @@ import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class ManagedLedgerTest extends MockedBookKeeperTestCase {
@@ -2489,6 +2492,41 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         setFieldValue(ManagedLedgerImpl.class, ledger, "currentLedger", null);
     }
 
+    @Test
+    public void avoidUseSameOpAddEntryBetweenDifferentLedger() throws 
Exception {
+        ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig();
+        config.setMaxCacheSize(0);
+        ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(bkc, 
zkc, config);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("my_test_ledger");
+
+        List<OpAddEntry> oldOps = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            OpAddEntry op = OpAddEntry.create(ledger, 
ByteBufAllocator.DEFAULT.buffer(128), null, null);
+            if (i > 4) {
+                op.setLedger(mock(LedgerHandle.class));
+            }
+            oldOps.add(op);
+            ledger.pendingAddEntries.add(op);
+        }
+
+        ledger.updateLedgersIdsComplete(mock(Stat.class));
+        for (int i = 0; i < 10; i++) {
+            OpAddEntry oldOp = oldOps.get(i);
+            if (i > 4) {
+                Assert.assertEquals(oldOp.getState(), OpAddEntry.State.CLOSED);
+            } else {
+                Assert.assertEquals(oldOp.getState(), 
OpAddEntry.State.INITIATED);
+            }
+            OpAddEntry newOp = ledger.pendingAddEntries.poll();
+            Assert.assertEquals(newOp.getState(), OpAddEntry.State.INITIATED);
+            if (i > 4) {
+                Assert.assertNotSame(oldOp, newOp);
+            } else {
+                Assert.assertSame(oldOp, newOp);
+            }
+        }
+    }
+
     private void setFieldValue(Class clazz, Object classObj, String fieldName, 
Object fieldValue) throws Exception {
         Field field = clazz.getDeclaredField(fieldName);
         field.setAccessible(true);

Reply via email to