This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 07a3767df36 [fix] [ml] Add entry fail due to race condition about add
entry failed/timeout and switch ledger (#22221)
07a3767df36 is described below
commit 07a3767df36dd86717887075c3bf76ba0fbd2081
Author: fengyubiao <[email protected]>
AuthorDate: Tue May 21 15:50:53 2024 +0800
[fix] [ml] Add entry fail due to race condition about add entry
failed/timeout and switch ledger (#22221)
(cherry picked from commit ae9616b7ee41e77dde590045b8e1d5bd04192ffc)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 30 ++++++++----
.../apache/bookkeeper/mledger/impl/OpAddEntry.java | 39 +++++++++-------
.../mledger/impl/ShadowManagedLedgerImpl.java | 3 ++
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 53 +++++++++++++++++++++-
.../bookkeeper/client/PulsarMockBookKeeper.java | 7 +++
.../bookkeeper/client/PulsarMockLedgerHandle.java | 7 +++
6 files changed, 113 insertions(+), 26 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 07d0d40569a..167c0a1bad3 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -59,6 +59,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
@@ -240,6 +241,9 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
protected volatile long lastAddEntryTimeMs = 0;
private long inactiveLedgerRollOverTimeMs = 0;
+ /** A signal that may trigger all the subsequent OpAddEntry of current
ledger to be failed due to timeout. **/
+ protected volatile AtomicBoolean currentLedgerTimeoutTriggered;
+
protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3;
protected static final int DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC = 60;
private static final String MIGRATION_STATE_PROPERTY = "migrated";
@@ -532,6 +536,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
STATE_UPDATER.set(this, State.LedgerOpened);
updateLastLedgerCreatedTimeAndScheduleRolloverTask();
currentLedger = lh;
+ currentLedgerTimeoutTriggered = new AtomicBoolean();
lastConfirmedEntry = new PositionImpl(lh.getId(), -1);
// bypass empty ledgers, find last ledger with Message if
possible.
@@ -774,7 +779,8 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
// Jump to specific thread to avoid contention from writers writing
from different threads
executor.execute(() -> {
- OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this,
buffer, callback, ctx);
+ OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this,
buffer, callback, ctx,
+ currentLedgerTimeoutTriggered);
internalAsyncAddEntry(addOperation);
});
}
@@ -790,7 +796,8 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
// Jump to specific thread to avoid contention from writers writing
from different threads
executor.execute(() -> {
- OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this,
buffer, numberOfMessages, callback, ctx);
+ OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this,
buffer, numberOfMessages, callback, ctx,
+ currentLedgerTimeoutTriggered);
internalAsyncAddEntry(addOperation);
});
}
@@ -842,6 +849,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
// Write into lastLedger
addOperation.setLedger(currentLedger);
+ addOperation.setTimeoutTriggered(currentLedgerTimeoutTriggered);
++currentLedgerEntries;
currentLedgerSize += addOperation.data.readableBytes();
@@ -1585,6 +1593,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
LedgerHandle originalCurrentLedger = currentLedger;
ledgers.put(lh.getId(), newLedger);
currentLedger = lh;
+ currentLedgerTimeoutTriggered = new AtomicBoolean();
currentLedgerEntries = 0;
currentLedgerSize = 0;
updateLedgersIdsComplete(originalCurrentLedger);
@@ -1668,9 +1677,11 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
if (existsOp != null) {
// If op is used by another ledger handle, we need to close it
and create a new one
if (existsOp.ledger != null) {
- existsOp.close();
- existsOp = OpAddEntry.createNoRetainBuffer(existsOp.ml,
existsOp.data,
- existsOp.getNumberOfMessages(), existsOp.callback,
existsOp.ctx);
+ existsOp =
existsOp.duplicateAndClose(currentLedgerTimeoutTriggered);
+ } else {
+ // This scenario should not happen.
+ log.warn("[{}] An OpAddEntry's ledger is empty.", name);
+
existsOp.setTimeoutTriggered(currentLedgerTimeoutTriggered);
}
existsOp.setLedger(currentLedger);
pendingAddEntries.add(existsOp);
@@ -4164,13 +4175,14 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
}
OpAddEntry opAddEntry = pendingAddEntries.peek();
if (opAddEntry != null) {
- final long finalAddOpCount = opAddEntry.addOpCount;
boolean isTimedOut = opAddEntry.lastInitTime != -1
&& TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() -
opAddEntry.lastInitTime) >= timeoutSec;
if (isTimedOut) {
- log.error("Failed to add entry for ledger {} in time-out {}
sec",
- (opAddEntry.ledger != null ? opAddEntry.ledger.getId()
: -1), timeoutSec);
- opAddEntry.handleAddTimeoutFailure(opAddEntry.ledger,
finalAddOpCount);
+ log.warn("[{}] Failed to add entry {}:{} in time-out {} sec",
this.name,
+ opAddEntry.ledger != null ? opAddEntry.ledger.getId()
: -1,
+ opAddEntry.entryId, timeoutSec);
+ currentLedgerTimeoutTriggered.set(true);
+ opAddEntry.handleAddFailure(opAddEntry.ledger);
}
}
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index ae2beafb643..acbb0da5a4e 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -24,8 +24,10 @@ import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
@@ -45,7 +47,7 @@ import
org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
public class OpAddEntry implements AddCallback, CloseCallback, Runnable {
protected ManagedLedgerImpl ml;
LedgerHandle ledger;
- private long entryId;
+ long entryId;
private int numberOfMessages;
@SuppressWarnings("unused")
@@ -68,6 +70,9 @@ public class OpAddEntry implements AddCallback,
CloseCallback, Runnable {
AtomicReferenceFieldUpdater.newUpdater(OpAddEntry.class,
OpAddEntry.State.class, "state");
volatile State state;
+ @Setter
+ private AtomicBoolean timeoutTriggered;
+
enum State {
OPEN,
INITIATED,
@@ -76,8 +81,8 @@ public class OpAddEntry implements AddCallback,
CloseCallback, Runnable {
}
public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml,
ByteBuf data, AddEntryCallback callback,
- Object ctx) {
- OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback,
ctx);
+ Object ctx, AtomicBoolean
timeoutTriggered) {
+ OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback,
ctx, timeoutTriggered);
if (log.isDebugEnabled()) {
log.debug("Created new OpAddEntry {}", op);
}
@@ -85,8 +90,9 @@ public class OpAddEntry implements AddCallback,
CloseCallback, Runnable {
}
public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml,
ByteBuf data, int numberOfMessages,
- AddEntryCallback callback,
Object ctx) {
- OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback,
ctx);
+ AddEntryCallback callback,
Object ctx,
+ AtomicBoolean
timeoutTriggered) {
+ OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback,
ctx, timeoutTriggered);
op.numberOfMessages = numberOfMessages;
if (log.isDebugEnabled()) {
log.debug("Created new OpAddEntry {}", op);
@@ -95,7 +101,8 @@ public class OpAddEntry implements AddCallback,
CloseCallback, Runnable {
}
private static OpAddEntry createOpAddEntryNoRetainBuffer(ManagedLedgerImpl
ml, ByteBuf data,
- AddEntryCallback
callback, Object ctx) {
+ AddEntryCallback
callback, Object ctx,
+ AtomicBoolean
timeoutTriggered) {
OpAddEntry op = RECYCLER.get();
op.ml = ml;
op.ledger = null;
@@ -109,6 +116,7 @@ public class OpAddEntry implements AddCallback,
CloseCallback, Runnable {
op.startTime = System.nanoTime();
op.state = State.OPEN;
op.payloadProcessorHandle = null;
+ op.timeoutTriggered = timeoutTriggered;
ml.mbean.addAddEntrySample(op.dataLength);
return op;
}
@@ -176,7 +184,9 @@ public class OpAddEntry implements AddCallback,
CloseCallback, Runnable {
if (!STATE_UPDATER.compareAndSet(OpAddEntry.this, State.INITIATED,
State.COMPLETED)) {
log.warn("[{}] The add op is terminal legacy callback for entry
{}-{} adding.", ml.getName(), lh.getId(),
entryId);
- OpAddEntry.this.recycle();
+ // Since there is a thread is coping this object, do not recycle
this object to avoid other problems.
+ // For example: we recycled this object, other thread get a null
"opAddEntry.{variable_name}".
+ // Recycling is not mandatory, JVM GC will collect it.
return;
}
@@ -200,7 +210,7 @@ public class OpAddEntry implements AddCallback,
CloseCallback, Runnable {
lh == null ? -1 : lh.getId(), entryId, dataLength, rc);
}
- if (rc != BKException.Code.OK) {
+ if (rc != BKException.Code.OK || timeoutTriggered.get()) {
handleAddFailure(lh);
} else {
// Trigger addComplete callback in a thread hashed on the managed
ledger name
@@ -307,13 +317,6 @@ public class OpAddEntry implements AddCallback,
CloseCallback, Runnable {
return false;
}
- void handleAddTimeoutFailure(final LedgerHandle ledger, Object ctx) {
- if (checkAndCompleteOp(ctx)) {
- this.close();
- this.handleAddFailure(ledger);
- }
- }
-
/**
* It handles add failure on the given ledger. it can be triggered when
add-entry fails or times out.
*
@@ -333,8 +336,11 @@ public class OpAddEntry implements AddCallback,
CloseCallback, Runnable {
});
}
- void close() {
+ OpAddEntry duplicateAndClose(AtomicBoolean timeoutTriggered) {
STATE_UPDATER.set(OpAddEntry.this, State.CLOSED);
+ OpAddEntry duplicate =
+ OpAddEntry.createNoRetainBuffer(ml, data,
getNumberOfMessages(), callback, ctx, timeoutTriggered);
+ return duplicate;
}
public State getState() {
@@ -389,6 +395,7 @@ public class OpAddEntry implements AddCallback,
CloseCallback, Runnable {
startTime = -1;
lastInitTime = -1;
payloadProcessorHandle = null;
+ timeoutTriggered = null;
recyclerHandle.recycle(this);
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
index 8b2742d9587..ec5b006c474 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.AsyncCallback;
@@ -54,6 +55,8 @@ public class ShadowManagedLedgerImpl extends
ManagedLedgerImpl {
String name, final
Supplier<CompletableFuture<Boolean>> mlOwnershipChecker) {
super(factory, bookKeeper, store, config, scheduledExecutor, name,
mlOwnershipChecker);
this.sourceMLName = config.getShadowSourceName();
+ // ShadowManagedLedgerImpl does not implement add entry timeout yet,
so this variable will always be false.
+ this.currentLedgerTimeoutTriggered = new AtomicBoolean(false);
}
/**
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 122bada487a..4f521f1e99e 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -146,6 +146,7 @@ import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
+import org.eclipse.jetty.util.BlockingArrayQueue;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
@@ -3185,6 +3186,55 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
ledger.close();
}
+ @Test
+ public void testAddEntryResponseTimeout() throws Exception {
+ // Create ML with feature Add Entry Timeout Check.
+ final ManagedLedgerConfig config = new
ManagedLedgerConfig().setAddEntryTimeoutSeconds(2);
+ final ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("ml1", config);
+ final ManagedCursor cursor = ledger.openCursor("c1");
+ final CollectCtxAddEntryCallback collectCtxAddEntryCallback = new
CollectCtxAddEntryCallback();
+
+ // Insert a response delay.
+ bkc.addEntryResponseDelay(8, TimeUnit.SECONDS);
+
+ // Add two entries.
+ final byte[] msg1 = new byte[]{1};
+ final byte[] msg2 = new byte[]{2};
+ int ctx1 = 1;
+ int ctx2 = 2;
+ ledger.asyncAddEntry(msg1, collectCtxAddEntryCallback, ctx1);
+ ledger.asyncAddEntry(msg2, collectCtxAddEntryCallback, ctx2);
+ // Verify all write requests are completed.
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(collectCtxAddEntryCallback.addCompleteCtxList,
Arrays.asList(1, 2));
+ });
+ Entry entry1 = cursor.readEntries(1).get(0);
+ assertEquals(entry1.getData(), msg1);
+ entry1.release();
+ Entry entry2 = cursor.readEntries(1).get(0);
+ assertEquals(entry2.getData(), msg2);
+ entry2.release();
+
+ // cleanup.
+ factory.delete(ledger.name);
+ }
+
+ private static class CollectCtxAddEntryCallback implements
AddEntryCallback {
+
+ public List<Object> addCompleteCtxList = new BlockingArrayQueue<>();
+ public List<Object> addFailedCtxList = new BlockingArrayQueue<>();
+
+ @Override
+ public void addComplete(Position position, ByteBuf entryData, Object
ctx) {
+ addCompleteCtxList.add(ctx);
+ }
+
+ @Override
+ public void addFailed(ManagedLedgerException exception, Object ctx) {
+ addFailedCtxList.add(ctx);
+ }
+ }
+
/**
* It verifies that if bk-client doesn't complete the add-entry in given
time out then broker is resilient enough
* to create new ledger and add entry successfully.
@@ -3260,7 +3310,8 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
List<OpAddEntry> oldOps = new ArrayList<>();
for (int i = 0; i < 10; i++) {
- OpAddEntry op = OpAddEntry.createNoRetainBuffer(ledger,
ByteBufAllocator.DEFAULT.buffer(128).retain(), null, null);
+ OpAddEntry op = OpAddEntry.createNoRetainBuffer(ledger,
+ ByteBufAllocator.DEFAULT.buffer(128).retain(), null, null,
new AtomicBoolean());
if (i > 4) {
op.setLedger(mock(LedgerHandle.class));
}
diff --git
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
index f0d279ef250..4516cfea01f 100644
---
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
+++
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
@@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.client;
+import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Arrays;
@@ -89,6 +90,7 @@ public class PulsarMockBookKeeper extends BookKeeper {
}
final Queue<Long> addEntryDelaysMillis = new ConcurrentLinkedQueue<>();
+ final Queue<Long> addEntryResponseDelaysMillis = new
ConcurrentLinkedQueue<>();
final List<CompletableFuture<Void>> failures = new ArrayList<>();
final List<CompletableFuture<Void>> addEntryFailures = new ArrayList<>();
@@ -367,6 +369,11 @@ public class PulsarMockBookKeeper extends BookKeeper {
addEntryDelaysMillis.add(unit.toMillis(delay));
}
+ public synchronized void addEntryResponseDelay(long delay, TimeUnit unit) {
+ checkArgument(delay >= 0, "The delay time must not be negative.");
+ addEntryResponseDelaysMillis.add(unit.toMillis(delay));
+ }
+
static int getExceptionCode(Throwable t) {
if (t instanceof BKException) {
return ((BKException) t).getCode();
diff --git
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
index dea33a0e676..aa61e541d0d 100644
---
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
+++
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
@@ -197,6 +197,13 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
cb.addComplete(PulsarMockBookKeeper.getExceptionCode(exception),
PulsarMockLedgerHandle.this,
LedgerHandle.INVALID_ENTRY_ID, ctx);
} else {
+ Long responseDelayMillis =
bk.addEntryResponseDelaysMillis.poll();
+ if (responseDelayMillis != null) {
+ try {
+ Thread.sleep(responseDelayMillis);
+ } catch (InterruptedException e) {
+ }
+ }
cb.addComplete(BKException.Code.OK,
PulsarMockLedgerHandle.this, entryId, ctx);
}
}, bk.executor);