This is an automated email from the ASF dual-hosted git repository. dragonyliu pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 2afee937948ac5eb2ec5f6193e8cfe137168b92b Author: Doroszlai, Attila <[email protected]> AuthorDate: Sun Aug 7 08:28:09 2022 +0200 RATIS-1656. Leftover usage of ForkJoinPool.commonPool() in RaftServerImpl (#702) (cherry picked from commit 9bbb4401b832a69035bf0b186bb9525bf6aadeb9) --- .../src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java | 3 ++- .../main/java/org/apache/ratis/server/impl/RaftServerProxy.java | 4 +++- .../java/org/apache/ratis/InstallSnapshotNotificationTests.java | 7 ++++++- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index fe9f87ce9..6ab0a6a5c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1414,7 +1414,8 @@ class RaftServerImpl implements RaftServer.Division, } } return JavaUtils.allOf(futures).whenCompleteAsync( - (r, t) -> followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE)) + (r, t) -> followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE)), + serverExecutor ).thenApply(v -> { final AppendEntriesReplyProto reply; synchronized(this) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java index 96f7efbe1..ad4d988ab 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java @@ -115,7 +115,9 @@ class RaftServerProxy implements RaftServer { return; } isClosed = true; - map.entrySet().parallelStream().forEach(entry -> close(entry.getKey(), entry.getValue())); + ConcurrentUtils.parallelForEachAsync(map.entrySet(), + entry -> close(entry.getKey(), entry.getValue()), + executor); } private void close(RaftGroupId groupId, CompletableFuture<RaftServerImpl> future) { diff --git a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java index 4476f3ecf..215e8408f 100644 --- a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java @@ -51,6 +51,8 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -85,6 +87,9 @@ public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC private static final AtomicInteger numNotifyInstallSnapshotFinished = new AtomicInteger(); private static class StateMachine4InstallSnapshotNotificationTests extends SimpleStateMachine4Testing { + + private final Executor stateMachineExecutor = Executors.newSingleThreadExecutor(); + @Override public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader( RaftProtos.RoleInfoProto roleInfoProto, @@ -120,7 +125,7 @@ public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC return leaderSnapshotInfo.getTermIndex(); }; - return CompletableFuture.supplyAsync(supplier); + return CompletableFuture.supplyAsync(supplier, stateMachineExecutor); } @Override
