This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 33cb58678 Revert "RATIS-2080. Reuse LeaderElection executor. (#1082)"
33cb58678 is described below
commit 33cb58678b499ecc5816079e5f93bf03debc8b48
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri May 10 13:49:19 2024 -0700
Revert "RATIS-2080. Reuse LeaderElection executor. (#1082)"
This reverts commit 8c9c801e5b8edc68971911e5819e89e74e114683.
---
.../apache/ratis/server/impl/LeaderElection.java | 53 ++++++++++++++--------
.../apache/ratis/server/impl/RaftServerImpl.java | 24 ++--------
2 files changed, 37 insertions(+), 40 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index 2f672c17e..d738c8757 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -42,16 +42,15 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.Set;
import java.util.stream.Collectors;
@@ -131,22 +130,28 @@ class LeaderElection implements Runnable {
}
static class Executor {
+ private final ExecutorCompletionService<RequestVoteReplyProto> service;
private final ExecutorService executor;
private final AtomicInteger count = new AtomicInteger();
Executor(Object name, int size) {
Preconditions.assertTrue(size > 0);
- executor = Executors.newCachedThreadPool(r ->
+ executor = Executors.newFixedThreadPool(size, r ->
Daemon.newBuilder().setName(name + "-" +
count.incrementAndGet()).setRunnable(r).build());
+ service = new ExecutorCompletionService<>(executor);
}
- ExecutorService getExecutor() {
- return executor;
+ void shutdown() {
+ executor.shutdown();
}
- Future<RequestVoteReplyProto> submit(Callable<RequestVoteReplyProto> task)
{
- return executor.submit(task);
+ void submit(Callable<RequestVoteReplyProto> task) {
+ service.submit(task);
+ }
+
+ Future<RequestVoteReplyProto> poll(TimeDuration waitTime) throws
InterruptedException {
+ return service.poll(waitTime.getDuration(), waitTime.getUnit());
}
}
@@ -285,9 +290,13 @@ class LeaderElection implements Runnable {
r = new ResultAndTerm(Result.PASSED, electionTerm);
} else {
final TermIndex lastEntry = server.getState().getLastEntry();
- final List<Future<RequestVoteReplyProto>> submitted = submitRequests(
- phase, electionTerm, lastEntry, others,
server.getLeaderElectionExecutor());
- r = waitForResults(phase, electionTerm, submitted, conf);
+ final Executor voteExecutor = new Executor(this, others.size());
+ try {
+ final int submitted = submitRequests(phase, electionTerm, lastEntry,
others, voteExecutor);
+ r = waitForResults(phase, electionTerm, submitted, conf, voteExecutor);
+ } finally {
+ voteExecutor.shutdown();
+ }
}
return r;
@@ -339,13 +348,14 @@ class LeaderElection implements Runnable {
}
}
- private List<Future<RequestVoteReplyProto>> submitRequests(Phase phase, long
electionTerm, TermIndex lastEntry,
+ private int submitRequests(Phase phase, long electionTerm, TermIndex
lastEntry,
Collection<RaftPeer> others, Executor voteExecutor) {
- final List<Future<RequestVoteReplyProto>> submitted = new
ArrayList<>(others.size());
+ int submitted = 0;
for (final RaftPeer peer : others) {
final RequestVoteRequestProto r =
ServerProtoUtils.toRequestVoteRequestProto(
server.getMemberId(), peer.getId(), electionTerm, lastEntry, phase
== Phase.PRE_VOTE);
- submitted.add(voteExecutor.submit(() ->
server.getServerRpc().requestVote(r)));
+ voteExecutor.submit(() -> server.getServerRpc().requestVote(r));
+ submitted++;
}
return submitted;
}
@@ -359,17 +369,18 @@ class LeaderElection implements Runnable {
.collect(Collectors.toSet());
}
- private ResultAndTerm waitForResults(Phase phase, long electionTerm,
List<Future<RequestVoteReplyProto>> submitted,
- RaftConfigurationImpl conf) throws InterruptedException {
+ private ResultAndTerm waitForResults(Phase phase, long electionTerm, int
submitted,
+ RaftConfigurationImpl conf, Executor voteExecutor) throws
InterruptedException {
final Timestamp timeout =
Timestamp.currentTime().addTime(server.getRandomElectionTimeout());
final Map<RaftPeerId, RequestVoteReplyProto> responses = new HashMap<>();
final List<Exception> exceptions = new ArrayList<>();
+ int waitForNum = submitted;
Collection<RaftPeerId> votedPeers = new ArrayList<>();
Collection<RaftPeerId> rejectedPeers = new ArrayList<>();
Set<RaftPeerId> higherPriorityPeers = getHigherPriorityPeers(conf);
final boolean singleMode = conf.isSingleMode(server.getId());
- for(Iterator<Future<RequestVoteReplyProto>> i = submitted.iterator();
i.hasNext() && shouldRun(electionTerm); ) {
+ while (waitForNum > 0 && shouldRun(electionTerm)) {
final TimeDuration waitTime = timeout.elapsedTime().apply(n -> -n);
if (waitTime.isNonPositive()) {
if (conf.hasMajority(votedPeers, server.getId())) {
@@ -385,9 +396,12 @@ class LeaderElection implements Runnable {
}
try {
- final Future<RequestVoteReplyProto> future = i.next();
- final RequestVoteReplyProto r = future.get(waitTime.getDuration(),
waitTime.getUnit());
+ final Future<RequestVoteReplyProto> future =
voteExecutor.poll(waitTime);
+ if (future == null) {
+ continue; // poll timeout, continue to return Result.TIMEOUT
+ }
+ final RequestVoteReplyProto r = future.get();
final RaftPeerId replierId =
RaftPeerId.valueOf(r.getServerReply().getReplyId());
final RequestVoteReplyProto previous =
responses.putIfAbsent(replierId, r);
if (previous != null) {
@@ -430,9 +444,8 @@ class LeaderElection implements Runnable {
} catch(ExecutionException e) {
LogUtils.infoOrTrace(LOG, () -> this + " got exception when requesting
votes", e);
exceptions.add(e);
- } catch (TimeoutException e) {
- // get timeout, continue to return Result.TIMEOUT
}
+ waitForNum--;
}
// received all the responses
if (conf.hasMajority(votedPeers, server.getId())) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index c6a27ba8c..4512a2c22 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -243,7 +243,6 @@ class RaftServerImpl implements RaftServer.Division,
private final ExecutorService serverExecutor;
private final ExecutorService clientExecutor;
- private final MemoizedSupplier<LeaderElection.Executor>
leaderElectionExecutor;
private final AtomicBoolean firstElectionSinceStartup = new
AtomicBoolean(true);
private final ThreadGroup threadGroup;
@@ -282,17 +281,14 @@ class RaftServerImpl implements RaftServer.Division,
this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this);
this.snapshotInstallationHandler = new SnapshotInstallationHandler(this,
properties);
- final RaftGroupMemberId memberId = getMemberId();
this.serverExecutor = ConcurrentUtils.newThreadPoolWithMax(
RaftServerConfigKeys.ThreadPool.serverCached(properties),
RaftServerConfigKeys.ThreadPool.serverSize(properties),
- memberId + "-server");
+ id + "-server");
this.clientExecutor = ConcurrentUtils.newThreadPoolWithMax(
RaftServerConfigKeys.ThreadPool.clientCached(properties),
RaftServerConfigKeys.ThreadPool.clientSize(properties),
- memberId + "-client");
- this.leaderElectionExecutor = MemoizedSupplier.valueOf(
- () -> new LeaderElection.Executor(memberId + "-election" ,
group.getPeers().size()));
+ id + "-client");
}
private long getCommitIndex(RaftPeerId id) {
@@ -538,20 +534,12 @@ class RaftServerImpl implements RaftServer.Division,
try {
ConcurrentUtils.shutdownAndWait(clientExecutor);
} catch (Exception e) {
- LOG.warn("{}: Failed to shutdown clientExecutor", getMemberId(), e);
+ LOG.warn(getMemberId() + ": Failed to shutdown clientExecutor", e);
}
try {
ConcurrentUtils.shutdownAndWait(serverExecutor);
} catch (Exception e) {
- LOG.warn("{}: Failed to shutdown serverExecutor", getMemberId(), e);
- }
-
- if (leaderElectionExecutor.isInitialized()) {
- try {
-
ConcurrentUtils.shutdownAndWait(leaderElectionExecutor.get().getExecutor());
- } catch (Exception e) {
- LOG.warn("{}: Failed to shutdown leaderElectionExecutor",
getMemberId(), e);
- }
+ LOG.warn(getMemberId() + ": Failed to shutdown serverExecutor", e);
}
});
}
@@ -1537,10 +1525,6 @@ class RaftServerImpl implements RaftServer.Division,
return serverExecutor;
}
- LeaderElection.Executor getLeaderElectionExecutor() {
- return leaderElectionExecutor.get();
- }
-
private CompletableFuture<AppendEntriesReplyProto>
appendEntriesAsync(RaftPeerId leaderId, long callId,
TermIndex previous, ReferenceCountedObject<AppendEntriesRequestProto>
requestRef) throws IOException {
final AppendEntriesRequestProto proto = requestRef.get();