This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new d85e99057b Pin order executor in LedgerHandle (#3738)
d85e99057b is described below
commit d85e99057bbc99deb083f91ebfc27d817079484b
Author: houxiaoyu <[email protected]>
AuthorDate: Thu Jul 13 14:15:20 2023 +0800
Pin order executor in LedgerHandle (#3738)
---
.../java/org/apache/bookkeeper/client/LedgerHandle.java | 5 ++++-
.../org/apache/bookkeeper/client/MockBookKeeper.java | 17 ++++++++++++-----
.../org/apache/bookkeeper/client/MockLedgerHandle.java | 11 +++--------
.../org/apache/bookkeeper/client/MockReadHandle.java | 2 +-
4 files changed, 20 insertions(+), 15 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index ea5295dc7a..945b284437 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -46,6 +46,7 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -100,6 +101,7 @@ public class LedgerHandle implements WriteHandle {
final byte[] ledgerKey;
private Versioned<LedgerMetadata> versionedMetadata;
final long ledgerId;
+ final ExecutorService executor;
long lastAddPushed;
private enum HandleState {
@@ -194,6 +196,7 @@ public class LedgerHandle implements WriteHandle {
this.pendingAddsSequenceHead = lastAddConfirmed;
this.ledgerId = ledgerId;
+ this.executor = clientCtx.getMainWorkerPool().chooseThread(ledgerId);
if (clientCtx.getConf().enableStickyReads
&& getLedgerMetadata().getEnsembleSize() ==
getLedgerMetadata().getWriteQuorumSize()) {
@@ -2081,7 +2084,7 @@ public class LedgerHandle implements WriteHandle {
* @throws RejectedExecutionException
*/
void executeOrdered(Runnable runnable) throws RejectedExecutionException {
- clientCtx.getMainWorkerPool().executeOrdered(ledgerId, runnable);
+ executor.execute(runnable);
}
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java
index b7149b384f..da1525ab8d 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java
@@ -25,8 +25,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -37,6 +35,7 @@ import org.apache.bookkeeper.client.api.BKException.Code;
import org.apache.bookkeeper.client.api.OpenBuilder;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.impl.OpenBuilderBase;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
@@ -49,7 +48,10 @@ import org.slf4j.LoggerFactory;
*/
public class MockBookKeeper extends BookKeeper {
- final ExecutorService executor = Executors.newFixedThreadPool(1, new
DefaultThreadFactory("mock-bookkeeper"));
+ final OrderedExecutor orderedExecutor = OrderedExecutor.newBuilder()
+ .numThreads(1)
+ .threadFactory(new DefaultThreadFactory("mock-bookkeeper"))
+ .build();
final ZooKeeper zkc;
@Override
@@ -68,6 +70,11 @@ public class MockBookKeeper extends BookKeeper {
this.zkc = zkc;
}
+ @Override
+ public OrderedExecutor getMainWorkerPool() {
+ return orderedExecutor;
+ }
+
@Override
public LedgerHandle createLedger(DigestType digestType, byte[] passwd)
throws BKException {
return createLedger(3, 2, digestType, passwd);
@@ -86,7 +93,7 @@ public class MockBookKeeper extends BookKeeper {
return;
}
- executor.execute(new Runnable() {
+ orderedExecutor.chooseThread().execute(new Runnable() {
public void run() {
if (getProgrammedFailStatus()) {
if (failReturnCode != BkTimeoutOperation) {
@@ -256,7 +263,7 @@ public class MockBookKeeper extends BookKeeper {
}
ledgers.clear();
- executor.shutdownNow();
+ orderedExecutor.shutdownNow();
}
public boolean isStopped() {
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
index 73f9cfa253..f693c78783 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
@@ -82,18 +82,13 @@ public class MockLedgerHandle extends LedgerHandle {
fenced = true;
try {
- bk.executor.execute(() -> cb.closeComplete(0, this, ctx));
+ executeOrdered(() -> cb.closeComplete(0, this, ctx));
} catch (RejectedExecutionException e) {
cb.closeComplete(0, this, ctx);
}
}
- @Override
- void executeOrdered(Runnable runnable) throws RejectedExecutionException {
- bk.executor.execute(runnable);
- }
-
@Override
public void asyncReadEntries(final long firstEntry, final long lastEntry,
final ReadCallback cb, final Object ctx) {
if (bk.isStopped()) {
@@ -101,7 +96,7 @@ public class MockLedgerHandle extends LedgerHandle {
return;
}
- bk.executor.execute(new Runnable() {
+ executeOrdered(new Runnable() {
public void run() {
if (bk.getProgrammedFailStatus()) {
cb.readComplete(bk.failReturnCode, MockLedgerHandle.this,
null, ctx);
@@ -188,7 +183,7 @@ public class MockLedgerHandle extends LedgerHandle {
}
data.retain();
- bk.executor.execute(new Runnable() {
+ executeOrdered(new Runnable() {
public void run() {
if (bk.getProgrammedFailStatus()) {
fenced = true;
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockReadHandle.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockReadHandle.java
index b180aec106..fac6192a53 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockReadHandle.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockReadHandle.java
@@ -56,7 +56,7 @@ class MockReadHandle implements ReadHandle {
return promise;
}
- bk.executor.execute(() -> {
+ bk.orderedExecutor.chooseThread().execute(() -> {
if (bk.getProgrammedFailStatus()) {
promise.completeExceptionally(BKException.create(bk.failReturnCode));
return;