This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new c295f1880f Made PendingAddOp thread safe (#3784)
c295f1880f is described below

commit c295f1880f6c9c18ceb9bc985db9c88e636b9171
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Tue Feb 21 12:52:03 2023 -0800

    Made PendingAddOp thread safe (#3784)
---
 .../org/apache/bookkeeper/client/LedgerHandle.java | 11 ++---
 .../apache/bookkeeper/client/LedgerHandleAdv.java  |  8 +---
 .../org/apache/bookkeeper/client/PendingAddOp.java | 50 ++++++++--------------
 .../apache/bookkeeper/proto/BookieClientImpl.java  | 15 +------
 .../bookkeeper/proto/PerChannelBookieClient.java   |  4 +-
 .../apache/bookkeeper/client/PendingAddOpTest.java |  2 +-
 6 files changed, 26 insertions(+), 64 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 99936705be..04270c72f5 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -1337,7 +1337,7 @@ public class LedgerHandle implements WriteHandle {
                 });
             } catch (RejectedExecutionException e) {
                 
op.cb.addCompleteWithLatency(BookKeeper.getReturnRc(clientCtx.getBookieClient(),
-                                                                    
BKException.Code.InterruptedException),
+                                BKException.Code.InterruptedException),
                         LedgerHandle.this, INVALID_ENTRY_ID, 0, op.ctx);
                 op.recyclePendAddOpObject();
             }
@@ -1355,13 +1355,8 @@ public class LedgerHandle implements WriteHandle {
             }
         }
 
-        try {
-            executeOrdered(op);
-        } catch (RejectedExecutionException e) {
-            op.cb.addCompleteWithLatency(
-                    BookKeeper.getReturnRc(clientCtx.getBookieClient(), 
BKException.Code.InterruptedException),
-                    LedgerHandle.this, INVALID_ENTRY_ID, 0, op.ctx);
-        }
+        op.initiate();
+
     }
 
     synchronized void updateLastConfirmed(long lac, long len) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index 92bddc9cc4..c94a9154f5 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -275,13 +275,7 @@ public class LedgerHandleAdv extends LedgerHandle 
implements WriteAdvHandle {
             }
         }
 
-        try {
-            clientCtx.getMainWorkerPool().executeOrdered(ledgerId, op);
-        } catch (RejectedExecutionException e) {
-            
op.cb.addCompleteWithLatency(BookKeeper.getReturnRc(clientCtx.getBookieClient(),
-                                                                
BKException.Code.InterruptedException),
-                              LedgerHandleAdv.this, op.getEntryId(), 0, 
op.ctx);
-        }
+        op.initiate();
     }
 
     @Override
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index d0ff59e45c..8dc5037019 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -32,7 +32,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallbackWithLatency;
 import org.apache.bookkeeper.client.api.WriteFlag;
@@ -52,7 +51,7 @@ import org.slf4j.LoggerFactory;
  *
  *
  */
-class PendingAddOp implements Runnable, WriteCallback {
+class PendingAddOp implements WriteCallback {
     private static final Logger LOG = 
LoggerFactory.getLogger(PendingAddOp.class);
 
     ByteBuf payload;
@@ -68,7 +67,7 @@ class PendingAddOp implements Runnable, WriteCallback {
     LedgerHandle lh;
     ClientContext clientCtx;
     boolean isRecoveryAdd = false;
-    long requestTimeNanos;
+    volatile long requestTimeNanos;
     long qwcLatency; // Quorum Write Completion Latency after response from 
quorum bookies.
     Set<BookieId> addEntrySuccessBookies;
     long writeDelayedStartTime; // min fault domains completion latency after 
response from ack quorum bookies
@@ -143,7 +142,7 @@ class PendingAddOp implements Runnable, WriteCallback {
         return this.entryId;
     }
 
-    void sendWriteRequest(List<BookieId> ensemble, int bookieIndex) {
+    private void sendWriteRequest(List<BookieId> ensemble, int bookieIndex) {
         int flags = isRecoveryAdd ? FLAG_RECOVERY_ADD | FLAG_HIGH_PRIORITY : 
FLAG_NONE;
 
         clientCtx.getBookieClient().addEntry(ensemble.get(bookieIndex),
@@ -160,32 +159,22 @@ class PendingAddOp implements Runnable, WriteCallback {
         return false;
     }
 
-    void timeoutQuorumWait() {
-        try {
-            clientCtx.getMainWorkerPool().executeOrdered(lh.ledgerId, new 
Runnable() {
-                @Override
-                public void run() {
-                    if (completed) {
-                        return;
-                    } else if (addEntrySuccessBookies.size() >= 
lh.getLedgerMetadata().getAckQuorumSize()) {
-                        // If ackQuorum number of bookies have acknowledged 
the write but still not complete, indicates
-                        // failures due to not having been written to enough 
fault domains. Increment corresponding
-                        // counter.
-                        
clientCtx.getClientStats().getWriteTimedOutDueToNotEnoughFaultDomains().inc();
-                    }
-                    
lh.handleUnrecoverableErrorDuringAdd(BKException.Code.AddEntryQuorumTimeoutException);
-                }
-                @Override
-                public String toString() {
-                    return String.format("AddEntryQuorumTimeout(lid=%d, 
eid=%d)", lh.ledgerId, entryId);
-                }
-            });
-        } catch (RejectedExecutionException e) {
-            LOG.warn("Timeout add entry quorum wait failed {} entry: {}", 
lh.ledgerId, entryId);
+    synchronized void timeoutQuorumWait() {
+        if (completed) {
+            return;
         }
+
+        if (addEntrySuccessBookies.size() >= 
lh.getLedgerMetadata().getAckQuorumSize()) {
+            // If ackQuorum number of bookies have acknowledged the write but 
still not complete, indicates
+            // failures due to not having been written to enough fault 
domains. Increment corresponding
+            // counter.
+            
clientCtx.getClientStats().getWriteTimedOutDueToNotEnoughFaultDomains().inc();
+        }
+
+        
lh.handleUnrecoverableErrorDuringAdd(BKException.Code.AddEntryQuorumTimeoutException);
     }
 
-    void unsetSuccessAndSendWriteRequest(List<BookieId> ensemble, int 
bookieIndex) {
+    synchronized void unsetSuccessAndSendWriteRequest(List<BookieId> ensemble, 
int bookieIndex) {
         // update the ensemble
         this.ensemble = ensemble;
 
@@ -242,8 +231,7 @@ class PendingAddOp implements Runnable, WriteCallback {
     /**
      * Initiate the add operation.
      */
-    @Override
-    public void run() {
+    public void initiate() {
         hasRun = true;
         if (callbackTriggered) {
             // this should only be true if the request was failed due
@@ -280,7 +268,7 @@ class PendingAddOp implements Runnable, WriteCallback {
     }
 
     @Override
-    public void writeComplete(int rc, long ledgerId, long entryId, BookieId 
addr, Object ctx) {
+    public synchronized void writeComplete(int rc, long ledgerId, long 
entryId, BookieId addr, Object ctx) {
         int bookieIndex = (Integer) ctx;
         --pendingWriteRequests;
 
@@ -410,7 +398,7 @@ class PendingAddOp implements Runnable, WriteCallback {
         lh.sendAddSuccessCallbacks();
     }
 
-    void submitCallback(final int rc) {
+    synchronized void submitCallback(final int rc) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Submit callback (lid:{}, eid: {}). rc:{}", lh.getId(), 
entryId, rc);
         }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
index eaecb45282..43374776d4 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
@@ -280,20 +280,7 @@ public class BookieClientImpl implements BookieClient, 
PerChannelBookieClientFac
                              final BookieId addr,
                              final WriteCallback cb,
                              final Object ctx) {
-        try {
-            executor.executeOrdered(ledgerId, new Runnable() {
-                @Override
-                public void run() {
-                    cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
-                }
-                @Override
-                public String toString() {
-                    return String.format("CompleteWrite(ledgerId=%d, 
entryId=%d, addr=%s)", ledgerId, entryId, addr);
-                }
-            });
-        } catch (RejectedExecutionException ree) {
-            cb.writeComplete(getRc(BKException.Code.InterruptedException), 
ledgerId, entryId, addr, ctx);
-        }
+        cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
     }
 
     @Override
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 888077fe80..aea77fba29 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -778,9 +778,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
         if (useV2WireProtocol) {
             if (writeFlags.contains(WriteFlag.DEFERRED_SYNC)) {
                 LOG.error("invalid writeflags {} for v2 protocol", writeFlags);
-                executor.executeOrdered(ledgerId, () -> {
-                    cb.writeComplete(BKException.Code.IllegalOpException, 
ledgerId, entryId, bookieId, ctx);
-                });
+                cb.writeComplete(BKException.Code.IllegalOpException, 
ledgerId, entryId, bookieId, ctx);
                 return;
             }
             completionKey = acquireV2Key(ledgerId, entryId, 
OperationType.ADD_ENTRY);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingAddOpTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingAddOpTest.java
index 51d296c900..5fb318c51f 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingAddOpTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingAddOpTest.java
@@ -82,7 +82,7 @@ public class PendingAddOpTest {
         assertSame(lh, op.lh);
         assertEquals(Code.NotEnoughBookiesException, rcHolder.get());
 
-        op.run();
+        op.initiate();
         // after the op is run, the object is recycled.
         assertNull(op.lh);
     }

Reply via email to