This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new d85e99057b Pin order executor in LedgerHandle (#3738)
d85e99057b is described below

commit d85e99057bbc99deb083f91ebfc27d817079484b
Author: houxiaoyu <[email protected]>
AuthorDate: Thu Jul 13 14:15:20 2023 +0800

    Pin order executor in LedgerHandle (#3738)
---
 .../java/org/apache/bookkeeper/client/LedgerHandle.java |  5 ++++-
 .../org/apache/bookkeeper/client/MockBookKeeper.java    | 17 ++++++++++++-----
 .../org/apache/bookkeeper/client/MockLedgerHandle.java  | 11 +++--------
 .../org/apache/bookkeeper/client/MockReadHandle.java    |  2 +-
 4 files changed, 20 insertions(+), 15 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index ea5295dc7a..945b284437 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -46,6 +46,7 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -100,6 +101,7 @@ public class LedgerHandle implements WriteHandle {
     final byte[] ledgerKey;
     private Versioned<LedgerMetadata> versionedMetadata;
     final long ledgerId;
+    final ExecutorService executor;
     long lastAddPushed;
 
     private enum HandleState {
@@ -194,6 +196,7 @@ public class LedgerHandle implements WriteHandle {
         this.pendingAddsSequenceHead = lastAddConfirmed;
 
         this.ledgerId = ledgerId;
+        this.executor = clientCtx.getMainWorkerPool().chooseThread(ledgerId);
 
         if (clientCtx.getConf().enableStickyReads
                 && getLedgerMetadata().getEnsembleSize() == 
getLedgerMetadata().getWriteQuorumSize()) {
@@ -2081,7 +2084,7 @@ public class LedgerHandle implements WriteHandle {
      * @throws RejectedExecutionException
      */
     void executeOrdered(Runnable runnable) throws RejectedExecutionException {
-        clientCtx.getMainWorkerPool().executeOrdered(ledgerId, runnable);
+        executor.execute(runnable);
     }
 
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java
index b7149b384f..da1525ab8d 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java
@@ -25,8 +25,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -37,6 +35,7 @@ import org.apache.bookkeeper.client.api.BKException.Code;
 import org.apache.bookkeeper.client.api.OpenBuilder;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.client.impl.OpenBuilderBase;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
@@ -49,7 +48,10 @@ import org.slf4j.LoggerFactory;
  */
 public class MockBookKeeper extends BookKeeper {
 
-    final ExecutorService executor = Executors.newFixedThreadPool(1, new 
DefaultThreadFactory("mock-bookkeeper"));
+    final OrderedExecutor orderedExecutor = OrderedExecutor.newBuilder()
+            .numThreads(1)
+            .threadFactory(new DefaultThreadFactory("mock-bookkeeper"))
+            .build();
     final ZooKeeper zkc;
 
     @Override
@@ -68,6 +70,11 @@ public class MockBookKeeper extends BookKeeper {
         this.zkc = zkc;
     }
 
+    @Override
+    public OrderedExecutor getMainWorkerPool() {
+        return orderedExecutor;
+    }
+
     @Override
     public LedgerHandle createLedger(DigestType digestType, byte[] passwd) 
throws BKException {
         return createLedger(3, 2, digestType, passwd);
@@ -86,7 +93,7 @@ public class MockBookKeeper extends BookKeeper {
             return;
         }
 
-        executor.execute(new Runnable() {
+        orderedExecutor.chooseThread().execute(new Runnable() {
             public void run() {
                 if (getProgrammedFailStatus()) {
                     if (failReturnCode != BkTimeoutOperation) {
@@ -256,7 +263,7 @@ public class MockBookKeeper extends BookKeeper {
         }
 
         ledgers.clear();
-        executor.shutdownNow();
+        orderedExecutor.shutdownNow();
     }
 
     public boolean isStopped() {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
index 73f9cfa253..f693c78783 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
@@ -82,18 +82,13 @@ public class MockLedgerHandle extends LedgerHandle {
 
         fenced = true;
         try {
-            bk.executor.execute(() -> cb.closeComplete(0, this, ctx));
+            executeOrdered(() -> cb.closeComplete(0, this, ctx));
         } catch (RejectedExecutionException e) {
             cb.closeComplete(0, this, ctx);
         }
 
     }
 
-    @Override
-    void executeOrdered(Runnable runnable) throws RejectedExecutionException {
-        bk.executor.execute(runnable);
-    }
-
     @Override
     public void asyncReadEntries(final long firstEntry, final long lastEntry, 
final ReadCallback cb, final Object ctx) {
         if (bk.isStopped()) {
@@ -101,7 +96,7 @@ public class MockLedgerHandle extends LedgerHandle {
             return;
         }
 
-        bk.executor.execute(new Runnable() {
+        executeOrdered(new Runnable() {
             public void run() {
                 if (bk.getProgrammedFailStatus()) {
                     cb.readComplete(bk.failReturnCode, MockLedgerHandle.this, 
null, ctx);
@@ -188,7 +183,7 @@ public class MockLedgerHandle extends LedgerHandle {
         }
 
         data.retain();
-        bk.executor.execute(new Runnable() {
+        executeOrdered(new Runnable() {
             public void run() {
                 if (bk.getProgrammedFailStatus()) {
                     fenced = true;
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockReadHandle.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockReadHandle.java
index b180aec106..fac6192a53 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockReadHandle.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockReadHandle.java
@@ -56,7 +56,7 @@ class MockReadHandle implements ReadHandle {
             return promise;
         }
 
-        bk.executor.execute(() -> {
+        bk.orderedExecutor.chooseThread().execute(() -> {
                 if (bk.getProgrammedFailStatus()) {
                     
promise.completeExceptionally(BKException.create(bk.failReturnCode));
                     return;

Reply via email to