This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 33cb58678 Revert "RATIS-2080. Reuse LeaderElection executor. (#1082)"
33cb58678 is described below

commit 33cb58678b499ecc5816079e5f93bf03debc8b48
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri May 10 13:49:19 2024 -0700

    Revert "RATIS-2080. Reuse LeaderElection executor. (#1082)"
    
    This reverts commit 8c9c801e5b8edc68971911e5819e89e74e114683.
---
 .../apache/ratis/server/impl/LeaderElection.java   | 53 ++++++++++++++--------
 .../apache/ratis/server/impl/RaftServerImpl.java   | 24 ++--------
 2 files changed, 37 insertions(+), 40 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index 2f672c17e..d738c8757 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -42,16 +42,15 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -131,22 +130,28 @@ class LeaderElection implements Runnable {
   }
 
   static class Executor {
+    private final ExecutorCompletionService<RequestVoteReplyProto> service;
     private final ExecutorService executor;
 
     private final AtomicInteger count = new AtomicInteger();
 
     Executor(Object name, int size) {
       Preconditions.assertTrue(size > 0);
-      executor = Executors.newCachedThreadPool(r ->
+      executor = Executors.newFixedThreadPool(size, r ->
           Daemon.newBuilder().setName(name + "-" + 
count.incrementAndGet()).setRunnable(r).build());
+      service = new ExecutorCompletionService<>(executor);
     }
 
-    ExecutorService getExecutor() {
-      return executor;
+    void shutdown() {
+      executor.shutdown();
     }
 
-    Future<RequestVoteReplyProto> submit(Callable<RequestVoteReplyProto> task) 
{
-      return executor.submit(task);
+    void submit(Callable<RequestVoteReplyProto> task) {
+      service.submit(task);
+    }
+
+    Future<RequestVoteReplyProto> poll(TimeDuration waitTime) throws 
InterruptedException {
+      return service.poll(waitTime.getDuration(), waitTime.getUnit());
     }
   }
 
@@ -285,9 +290,13 @@ class LeaderElection implements Runnable {
       r = new ResultAndTerm(Result.PASSED, electionTerm);
     } else {
       final TermIndex lastEntry = server.getState().getLastEntry();
-      final List<Future<RequestVoteReplyProto>> submitted = submitRequests(
-          phase, electionTerm, lastEntry, others, 
server.getLeaderElectionExecutor());
-      r = waitForResults(phase, electionTerm, submitted, conf);
+      final Executor voteExecutor = new Executor(this, others.size());
+      try {
+        final int submitted = submitRequests(phase, electionTerm, lastEntry, 
others, voteExecutor);
+        r = waitForResults(phase, electionTerm, submitted, conf, voteExecutor);
+      } finally {
+        voteExecutor.shutdown();
+      }
     }
 
     return r;
@@ -339,13 +348,14 @@ class LeaderElection implements Runnable {
     }
   }
 
-  private List<Future<RequestVoteReplyProto>> submitRequests(Phase phase, long 
electionTerm, TermIndex lastEntry,
+  private int submitRequests(Phase phase, long electionTerm, TermIndex 
lastEntry,
       Collection<RaftPeer> others, Executor voteExecutor) {
-    final List<Future<RequestVoteReplyProto>> submitted = new 
ArrayList<>(others.size());
+    int submitted = 0;
     for (final RaftPeer peer : others) {
       final RequestVoteRequestProto r = 
ServerProtoUtils.toRequestVoteRequestProto(
           server.getMemberId(), peer.getId(), electionTerm, lastEntry, phase 
== Phase.PRE_VOTE);
-      submitted.add(voteExecutor.submit(() -> 
server.getServerRpc().requestVote(r)));
+      voteExecutor.submit(() -> server.getServerRpc().requestVote(r));
+      submitted++;
     }
     return submitted;
   }
@@ -359,17 +369,18 @@ class LeaderElection implements Runnable {
         .collect(Collectors.toSet());
   }
 
-  private ResultAndTerm waitForResults(Phase phase, long electionTerm, 
List<Future<RequestVoteReplyProto>> submitted,
-      RaftConfigurationImpl conf) throws InterruptedException {
+  private ResultAndTerm waitForResults(Phase phase, long electionTerm, int 
submitted,
+      RaftConfigurationImpl conf, Executor voteExecutor) throws 
InterruptedException {
     final Timestamp timeout = 
Timestamp.currentTime().addTime(server.getRandomElectionTimeout());
     final Map<RaftPeerId, RequestVoteReplyProto> responses = new HashMap<>();
     final List<Exception> exceptions = new ArrayList<>();
+    int waitForNum = submitted;
     Collection<RaftPeerId> votedPeers = new ArrayList<>();
     Collection<RaftPeerId> rejectedPeers = new ArrayList<>();
     Set<RaftPeerId> higherPriorityPeers = getHigherPriorityPeers(conf);
     final boolean singleMode = conf.isSingleMode(server.getId());
 
-    for(Iterator<Future<RequestVoteReplyProto>> i = submitted.iterator(); 
i.hasNext() && shouldRun(electionTerm); ) {
+    while (waitForNum > 0 && shouldRun(electionTerm)) {
       final TimeDuration waitTime = timeout.elapsedTime().apply(n -> -n);
       if (waitTime.isNonPositive()) {
         if (conf.hasMajority(votedPeers, server.getId())) {
@@ -385,9 +396,12 @@ class LeaderElection implements Runnable {
       }
 
       try {
-        final Future<RequestVoteReplyProto> future = i.next();
-        final RequestVoteReplyProto r = future.get(waitTime.getDuration(), 
waitTime.getUnit());
+        final Future<RequestVoteReplyProto> future = 
voteExecutor.poll(waitTime);
+        if (future == null) {
+          continue; // poll timeout, continue to return Result.TIMEOUT
+        }
 
+        final RequestVoteReplyProto r = future.get();
         final RaftPeerId replierId = 
RaftPeerId.valueOf(r.getServerReply().getReplyId());
         final RequestVoteReplyProto previous = 
responses.putIfAbsent(replierId, r);
         if (previous != null) {
@@ -430,9 +444,8 @@ class LeaderElection implements Runnable {
       } catch(ExecutionException e) {
         LogUtils.infoOrTrace(LOG, () -> this + " got exception when requesting 
votes", e);
         exceptions.add(e);
-      } catch (TimeoutException e) {
-        // get timeout, continue to return Result.TIMEOUT
       }
+      waitForNum--;
     }
     // received all the responses
     if (conf.hasMajority(votedPeers, server.getId())) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index c6a27ba8c..4512a2c22 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -243,7 +243,6 @@ class RaftServerImpl implements RaftServer.Division,
 
   private final ExecutorService serverExecutor;
   private final ExecutorService clientExecutor;
-  private final MemoizedSupplier<LeaderElection.Executor> 
leaderElectionExecutor;
 
   private final AtomicBoolean firstElectionSinceStartup = new 
AtomicBoolean(true);
   private final ThreadGroup threadGroup;
@@ -282,17 +281,14 @@ class RaftServerImpl implements RaftServer.Division,
     this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this);
     this.snapshotInstallationHandler = new SnapshotInstallationHandler(this, 
properties);
 
-    final RaftGroupMemberId memberId = getMemberId();
     this.serverExecutor = ConcurrentUtils.newThreadPoolWithMax(
         RaftServerConfigKeys.ThreadPool.serverCached(properties),
         RaftServerConfigKeys.ThreadPool.serverSize(properties),
-        memberId + "-server");
+        id + "-server");
     this.clientExecutor = ConcurrentUtils.newThreadPoolWithMax(
         RaftServerConfigKeys.ThreadPool.clientCached(properties),
         RaftServerConfigKeys.ThreadPool.clientSize(properties),
-        memberId + "-client");
-    this.leaderElectionExecutor = MemoizedSupplier.valueOf(
-        () -> new LeaderElection.Executor(memberId + "-election" , 
group.getPeers().size()));
+        id + "-client");
   }
 
   private long getCommitIndex(RaftPeerId id) {
@@ -538,20 +534,12 @@ class RaftServerImpl implements RaftServer.Division,
       try {
         ConcurrentUtils.shutdownAndWait(clientExecutor);
       } catch (Exception e) {
-        LOG.warn("{}: Failed to shutdown clientExecutor", getMemberId(), e);
+        LOG.warn(getMemberId() + ": Failed to shutdown clientExecutor", e);
       }
       try {
         ConcurrentUtils.shutdownAndWait(serverExecutor);
       } catch (Exception e) {
-        LOG.warn("{}: Failed to shutdown serverExecutor", getMemberId(), e);
-      }
-
-      if (leaderElectionExecutor.isInitialized()) {
-        try {
-          
ConcurrentUtils.shutdownAndWait(leaderElectionExecutor.get().getExecutor());
-        } catch (Exception e) {
-          LOG.warn("{}: Failed to shutdown leaderElectionExecutor", 
getMemberId(), e);
-        }
+        LOG.warn(getMemberId() + ": Failed to shutdown serverExecutor", e);
       }
     });
   }
@@ -1537,10 +1525,6 @@ class RaftServerImpl implements RaftServer.Division,
     return serverExecutor;
   }
 
-  LeaderElection.Executor getLeaderElectionExecutor() {
-    return leaderElectionExecutor.get();
-  }
-
   private CompletableFuture<AppendEntriesReplyProto> 
appendEntriesAsync(RaftPeerId leaderId, long callId,
       TermIndex previous, ReferenceCountedObject<AppendEntriesRequestProto> 
requestRef) throws IOException {
     final AppendEntriesRequestProto proto = requestRef.get();

Reply via email to