This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 9bbb4401b RATIS-1656. Leftover usage of ForkJoinPool.commonPool() in
RaftServerImpl (#702)
9bbb4401b is described below
commit 9bbb4401b832a69035bf0b186bb9525bf6aadeb9
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Sun Aug 7 08:28:09 2022 +0200
RATIS-1656. Leftover usage of ForkJoinPool.commonPool() in RaftServerImpl
(#702)
---
.../src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java | 3 ++-
.../main/java/org/apache/ratis/server/impl/RaftServerProxy.java | 4 +++-
.../java/org/apache/ratis/InstallSnapshotNotificationTests.java | 7 ++++++-
3 files changed, 11 insertions(+), 3 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index fe9f87ce9..6ab0a6a5c 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1414,7 +1414,8 @@ class RaftServerImpl implements RaftServer.Division,
}
}
return JavaUtils.allOf(futures).whenCompleteAsync(
- (r, t) -> followerState.ifPresent(fs ->
fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE))
+ (r, t) -> followerState.ifPresent(fs ->
fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE)),
+ serverExecutor
).thenApply(v -> {
final AppendEntriesReplyProto reply;
synchronized(this) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 96f7efbe1..ad4d988ab 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -115,7 +115,9 @@ class RaftServerProxy implements RaftServer {
return;
}
isClosed = true;
- map.entrySet().parallelStream().forEach(entry -> close(entry.getKey(),
entry.getValue()));
+ ConcurrentUtils.parallelForEachAsync(map.entrySet(),
+ entry -> close(entry.getKey(), entry.getValue()),
+ executor);
}
private void close(RaftGroupId groupId, CompletableFuture<RaftServerImpl>
future) {
diff --git
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
index 4476f3ecf..215e8408f 100644
---
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
+++
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
@@ -51,6 +51,8 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
@@ -85,6 +87,9 @@ public abstract class
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
private static final AtomicInteger numNotifyInstallSnapshotFinished = new
AtomicInteger();
private static class StateMachine4InstallSnapshotNotificationTests extends
SimpleStateMachine4Testing {
+
+ private final Executor stateMachineExecutor =
Executors.newSingleThreadExecutor();
+
@Override
public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
RaftProtos.RoleInfoProto roleInfoProto,
@@ -120,7 +125,7 @@ public abstract class
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
return leaderSnapshotInfo.getTermIndex();
};
- return CompletableFuture.supplyAsync(supplier);
+ return CompletableFuture.supplyAsync(supplier, stateMachineExecutor);
}
@Override