This is an automated email from the ASF dual-hosted git repository. guangning pushed a commit to branch branch-2.5 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit eaaf17bfc557493073d592f7325a3c30e924b6b4 Author: lipenghui <[email protected]> AuthorDate: Thu Jan 9 14:18:01 2020 +0800 Avoid using same OpAddEntry between different ledger handles (#5942) ### Motivation Avoid using same OpAddEntry between different ledger handles. ### Modifications Add state for OpAddEntry, if op handled by new ledger handle, the op will set to CLOSED state, after the legacy callback happens will check the op state, only INITIATED can be processed. When ledger rollover happens, pendingAddEntries will be processed. when process pendingAddEntries, will create a new OpAddEntry by the old OpAddEntry to avoid different ledger handles use same OpAddEntry. --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 20 +++++++-- .../apache/bookkeeper/mledger/impl/OpAddEntry.java | 49 ++++++++++++++++++---- .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 38 +++++++++++++++++ 3 files changed, 95 insertions(+), 12 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 3a80386..85850ac 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -553,13 +553,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { // Jump to specific thread to avoid contention from writers writing from different threads executor.executeOrdered(name, safeRun(() -> { - pendingAddEntries.add(addOperation); - internalAsyncAddEntry(addOperation); })); } private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { + pendingAddEntries.add(addOperation); final State state = STATE_UPDATER.get(this); if (state == State.Fenced) { addOperation.failed(new ManagedLedgerFencedException()); @@ -1294,9 +1293,24 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { log.debug("[{}] Resending {} pending messages", name, pendingAddEntries.size()); } + // Avoid use same OpAddEntry between different ledger handle + int pendingSize = pendingAddEntries.size(); + OpAddEntry existsOp; + do { + existsOp = pendingAddEntries.poll(); + if (existsOp != null) { + // If op is used by another ledger handle, we need to close it and create a new one + if (existsOp.ledger != null) { + existsOp.close(); + existsOp = OpAddEntry.create(existsOp.ml, existsOp.data, existsOp.callback, existsOp.ctx); + } + existsOp.setLedger(currentLedger); + pendingAddEntries.add(existsOp); + } + } while (existsOp != null && --pendingSize > 0); + // Process all the pending addEntry requests for (OpAddEntry op : pendingAddEntries) { - op.setLedger(currentLedger); ++currentLedgerEntries; currentLedgerSize += op.data.readableBytes(); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index e882458..6c2e872 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -43,12 +43,14 @@ import org.slf4j.LoggerFactory; * */ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback { - private ManagedLedgerImpl ml; + protected ManagedLedgerImpl ml; LedgerHandle ledger; private long entryId; @SuppressWarnings("unused") - private volatile AddEntryCallback callback; + private static final AtomicReferenceFieldUpdater<OpAddEntry, AddEntryCallback> callbackUpdater = + AtomicReferenceFieldUpdater.newUpdater(OpAddEntry.class, AddEntryCallback.class, "callback"); + protected volatile AddEntryCallback callback; Object ctx; volatile long addOpCount; private static final AtomicLongFieldUpdater<OpAddEntry> ADD_OP_COUNT_UPDATER = AtomicLongFieldUpdater @@ -60,8 +62,16 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback { ByteBuf data; private int dataLength; - private static final AtomicReferenceFieldUpdater<OpAddEntry, AddEntryCallback> callbackUpdater = - AtomicReferenceFieldUpdater.newUpdater(OpAddEntry.class, AddEntryCallback.class, "callback"); + private static final AtomicReferenceFieldUpdater<OpAddEntry, OpAddEntry.State> STATE_UPDATER = AtomicReferenceFieldUpdater + .newUpdater(OpAddEntry.class, OpAddEntry.State.class, "state"); + volatile State state; + + enum State { + OPEN, + INITIATED, + COMPLETED, + CLOSED + } public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback, Object ctx) { OpAddEntry op = RECYCLER.get(); @@ -75,6 +85,7 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback { op.closeWhenDone = false; op.entryId = -1; op.startTime = System.nanoTime(); + op.state = State.OPEN; ml.mbean.addAddEntrySample(op.dataLength); if (log.isDebugEnabled()) { log.debug("Created new OpAddEntry {}", op); @@ -91,12 +102,16 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback { } public void initiate() { - ByteBuf duplicateBuffer = data.retainedDuplicate(); + if (STATE_UPDATER.compareAndSet(OpAddEntry.this, State.OPEN, State.INITIATED)) { + ByteBuf duplicateBuffer = data.retainedDuplicate(); - // internally asyncAddEntry() will take the ownership of the buffer and release it at the end - addOpCount = ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml);; - lastInitTime = System.nanoTime(); - ledger.asyncAddEntry(duplicateBuffer, this, addOpCount); + // internally asyncAddEntry() will take the ownership of the buffer and release it at the end + addOpCount = ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml); + lastInitTime = System.nanoTime(); + ledger.asyncAddEntry(duplicateBuffer, this, addOpCount); + } else { + log.warn("[{}] initiate with unexpected state {}, expect OPEN state.", ml.getName(), state); + } } public void failed(ManagedLedgerException e) { @@ -110,6 +125,13 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback { @Override public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx) { + + if (!STATE_UPDATER.compareAndSet(OpAddEntry.this, State.INITIATED, State.COMPLETED)) { + log.warn("[{}] The add op is terminal legacy callback for entry {}-{} adding.", ml.getName(), lh.getId(), entryId); + OpAddEntry.this.recycle(); + return; + } + if (ledger.getId() != lh.getId()) { log.warn("[{}] ledgerId {} doesn't match with acked ledgerId {}", ml.getName(), ledger.getId(), lh.getId()); } @@ -216,6 +238,7 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback { void handleAddTimeoutFailure(final LedgerHandle ledger, Object ctx) { if (checkAndCompleteOp(ctx)) { + this.close(); this.handleAddFailure(ledger); } } @@ -237,6 +260,14 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback { ml.ledgerClosed(ledger); })); } + + void close() { + STATE_UPDATER.set(OpAddEntry.this, State.CLOSED); + } + + public State getState() { + return state; + } private final Handle<OpAddEntry> recyclerHandle; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index d94099b..b954f65 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -38,6 +38,7 @@ import com.google.common.base.Charsets; import com.google.common.collect.Sets; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import java.io.IOException; @@ -45,6 +46,7 @@ import java.lang.reflect.Field; import java.nio.charset.Charset; import java.security.GeneralSecurityException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -112,6 +114,7 @@ import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.Assert; import org.testng.annotations.Test; public class ManagedLedgerTest extends MockedBookKeeperTestCase { @@ -2489,6 +2492,41 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { setFieldValue(ManagedLedgerImpl.class, ledger, "currentLedger", null); } + @Test + public void avoidUseSameOpAddEntryBetweenDifferentLedger() throws Exception { + ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig(); + config.setMaxCacheSize(0); + ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(bkc, zkc, config); + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger"); + + List<OpAddEntry> oldOps = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + OpAddEntry op = OpAddEntry.create(ledger, ByteBufAllocator.DEFAULT.buffer(128), null, null); + if (i > 4) { + op.setLedger(mock(LedgerHandle.class)); + } + oldOps.add(op); + ledger.pendingAddEntries.add(op); + } + + ledger.updateLedgersIdsComplete(mock(Stat.class)); + for (int i = 0; i < 10; i++) { + OpAddEntry oldOp = oldOps.get(i); + if (i > 4) { + Assert.assertEquals(oldOp.getState(), OpAddEntry.State.CLOSED); + } else { + Assert.assertEquals(oldOp.getState(), OpAddEntry.State.INITIATED); + } + OpAddEntry newOp = ledger.pendingAddEntries.poll(); + Assert.assertEquals(newOp.getState(), OpAddEntry.State.INITIATED); + if (i > 4) { + Assert.assertNotSame(oldOp, newOp); + } else { + Assert.assertSame(oldOp, newOp); + } + } + } + private void setFieldValue(Class clazz, Object classObj, String fieldName, Object fieldValue) throws Exception { Field field = clazz.getDeclaredField(fieldName); field.setAccessible(true);
