This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new c295f1880f Made PendingAddOp thread safe (#3784)
c295f1880f is described below
commit c295f1880f6c9c18ceb9bc985db9c88e636b9171
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Feb 21 12:52:03 2023 -0800
Made PendingAddOp thread safe (#3784)
---
.../org/apache/bookkeeper/client/LedgerHandle.java | 11 ++---
.../apache/bookkeeper/client/LedgerHandleAdv.java | 8 +---
.../org/apache/bookkeeper/client/PendingAddOp.java | 50 ++++++++--------------
.../apache/bookkeeper/proto/BookieClientImpl.java | 15 +------
.../bookkeeper/proto/PerChannelBookieClient.java | 4 +-
.../apache/bookkeeper/client/PendingAddOpTest.java | 2 +-
6 files changed, 26 insertions(+), 64 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 99936705be..04270c72f5 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -1337,7 +1337,7 @@ public class LedgerHandle implements WriteHandle {
});
} catch (RejectedExecutionException e) {
op.cb.addCompleteWithLatency(BookKeeper.getReturnRc(clientCtx.getBookieClient(),
-
BKException.Code.InterruptedException),
+ BKException.Code.InterruptedException),
LedgerHandle.this, INVALID_ENTRY_ID, 0, op.ctx);
op.recyclePendAddOpObject();
}
@@ -1355,13 +1355,8 @@ public class LedgerHandle implements WriteHandle {
}
}
- try {
- executeOrdered(op);
- } catch (RejectedExecutionException e) {
- op.cb.addCompleteWithLatency(
- BookKeeper.getReturnRc(clientCtx.getBookieClient(),
BKException.Code.InterruptedException),
- LedgerHandle.this, INVALID_ENTRY_ID, 0, op.ctx);
- }
+ op.initiate();
+
}
synchronized void updateLastConfirmed(long lac, long len) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index 92bddc9cc4..c94a9154f5 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -275,13 +275,7 @@ public class LedgerHandleAdv extends LedgerHandle
implements WriteAdvHandle {
}
}
- try {
- clientCtx.getMainWorkerPool().executeOrdered(ledgerId, op);
- } catch (RejectedExecutionException e) {
-
op.cb.addCompleteWithLatency(BookKeeper.getReturnRc(clientCtx.getBookieClient(),
-
BKException.Code.InterruptedException),
- LedgerHandleAdv.this, op.getEntryId(), 0,
op.ctx);
- }
+ op.initiate();
}
@Override
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index d0ff59e45c..8dc5037019 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -32,7 +32,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.AsyncCallback.AddCallbackWithLatency;
import org.apache.bookkeeper.client.api.WriteFlag;
@@ -52,7 +51,7 @@ import org.slf4j.LoggerFactory;
*
*
*/
-class PendingAddOp implements Runnable, WriteCallback {
+class PendingAddOp implements WriteCallback {
private static final Logger LOG =
LoggerFactory.getLogger(PendingAddOp.class);
ByteBuf payload;
@@ -68,7 +67,7 @@ class PendingAddOp implements Runnable, WriteCallback {
LedgerHandle lh;
ClientContext clientCtx;
boolean isRecoveryAdd = false;
- long requestTimeNanos;
+ volatile long requestTimeNanos;
long qwcLatency; // Quorum Write Completion Latency after response from
quorum bookies.
Set<BookieId> addEntrySuccessBookies;
long writeDelayedStartTime; // min fault domains completion latency after
response from ack quorum bookies
@@ -143,7 +142,7 @@ class PendingAddOp implements Runnable, WriteCallback {
return this.entryId;
}
- void sendWriteRequest(List<BookieId> ensemble, int bookieIndex) {
+ private void sendWriteRequest(List<BookieId> ensemble, int bookieIndex) {
int flags = isRecoveryAdd ? FLAG_RECOVERY_ADD | FLAG_HIGH_PRIORITY :
FLAG_NONE;
clientCtx.getBookieClient().addEntry(ensemble.get(bookieIndex),
@@ -160,32 +159,22 @@ class PendingAddOp implements Runnable, WriteCallback {
return false;
}
- void timeoutQuorumWait() {
- try {
- clientCtx.getMainWorkerPool().executeOrdered(lh.ledgerId, new
Runnable() {
- @Override
- public void run() {
- if (completed) {
- return;
- } else if (addEntrySuccessBookies.size() >=
lh.getLedgerMetadata().getAckQuorumSize()) {
- // If ackQuorum number of bookies have acknowledged
the write but still not complete, indicates
- // failures due to not having been written to enough
fault domains. Increment corresponding
- // counter.
-
clientCtx.getClientStats().getWriteTimedOutDueToNotEnoughFaultDomains().inc();
- }
-
lh.handleUnrecoverableErrorDuringAdd(BKException.Code.AddEntryQuorumTimeoutException);
- }
- @Override
- public String toString() {
- return String.format("AddEntryQuorumTimeout(lid=%d,
eid=%d)", lh.ledgerId, entryId);
- }
- });
- } catch (RejectedExecutionException e) {
- LOG.warn("Timeout add entry quorum wait failed {} entry: {}",
lh.ledgerId, entryId);
+ synchronized void timeoutQuorumWait() {
+ if (completed) {
+ return;
}
+
+ if (addEntrySuccessBookies.size() >=
lh.getLedgerMetadata().getAckQuorumSize()) {
+ // If ackQuorum number of bookies have acknowledged the write but
still not complete, indicates
+ // failures due to not having been written to enough fault
domains. Increment corresponding
+ // counter.
+
clientCtx.getClientStats().getWriteTimedOutDueToNotEnoughFaultDomains().inc();
+ }
+
+
lh.handleUnrecoverableErrorDuringAdd(BKException.Code.AddEntryQuorumTimeoutException);
}
- void unsetSuccessAndSendWriteRequest(List<BookieId> ensemble, int
bookieIndex) {
+ synchronized void unsetSuccessAndSendWriteRequest(List<BookieId> ensemble,
int bookieIndex) {
// update the ensemble
this.ensemble = ensemble;
@@ -242,8 +231,7 @@ class PendingAddOp implements Runnable, WriteCallback {
/**
* Initiate the add operation.
*/
- @Override
- public void run() {
+ public void initiate() {
hasRun = true;
if (callbackTriggered) {
// this should only be true if the request was failed due
@@ -280,7 +268,7 @@ class PendingAddOp implements Runnable, WriteCallback {
}
@Override
- public void writeComplete(int rc, long ledgerId, long entryId, BookieId
addr, Object ctx) {
+ public synchronized void writeComplete(int rc, long ledgerId, long
entryId, BookieId addr, Object ctx) {
int bookieIndex = (Integer) ctx;
--pendingWriteRequests;
@@ -410,7 +398,7 @@ class PendingAddOp implements Runnable, WriteCallback {
lh.sendAddSuccessCallbacks();
}
- void submitCallback(final int rc) {
+ synchronized void submitCallback(final int rc) {
if (LOG.isDebugEnabled()) {
LOG.debug("Submit callback (lid:{}, eid: {}). rc:{}", lh.getId(),
entryId, rc);
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
index eaecb45282..43374776d4 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
@@ -280,20 +280,7 @@ public class BookieClientImpl implements BookieClient,
PerChannelBookieClientFac
final BookieId addr,
final WriteCallback cb,
final Object ctx) {
- try {
- executor.executeOrdered(ledgerId, new Runnable() {
- @Override
- public void run() {
- cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
- }
- @Override
- public String toString() {
- return String.format("CompleteWrite(ledgerId=%d,
entryId=%d, addr=%s)", ledgerId, entryId, addr);
- }
- });
- } catch (RejectedExecutionException ree) {
- cb.writeComplete(getRc(BKException.Code.InterruptedException),
ledgerId, entryId, addr, ctx);
- }
+ cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
}
@Override
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 888077fe80..aea77fba29 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -778,9 +778,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
if (useV2WireProtocol) {
if (writeFlags.contains(WriteFlag.DEFERRED_SYNC)) {
LOG.error("invalid writeflags {} for v2 protocol", writeFlags);
- executor.executeOrdered(ledgerId, () -> {
- cb.writeComplete(BKException.Code.IllegalOpException,
ledgerId, entryId, bookieId, ctx);
- });
+ cb.writeComplete(BKException.Code.IllegalOpException,
ledgerId, entryId, bookieId, ctx);
return;
}
completionKey = acquireV2Key(ledgerId, entryId,
OperationType.ADD_ENTRY);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingAddOpTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingAddOpTest.java
index 51d296c900..5fb318c51f 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingAddOpTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingAddOpTest.java
@@ -82,7 +82,7 @@ public class PendingAddOpTest {
assertSame(lh, op.lh);
assertEquals(Code.NotEnoughBookiesException, rcHolder.get());
- op.run();
+ op.initiate();
// after the op is run, the object is recycled.
assertNull(op.lh);
}