This is an automated email from the ASF dual-hosted git repository. dragonyliu pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 58389a6ef401ce5e575c0095932d983e42487937 Author: Jiacheng Liu <[email protected]> AuthorDate: Wed Sep 21 15:53:36 2022 +0800 RATIS-1709 Support specify ThreadGroup for Daemon threads (#733) (cherry picked from commit 31eff226ded5a0d8d1d64a50661a6c6260da84ef) --- .../main/java/org/apache/ratis/util/Daemon.java | 8 +++++++- .../java/org/apache/ratis/server/RaftServer.java | 23 ++++++++++++++++++---- .../apache/ratis/server/impl/FollowerState.java | 4 +++- .../apache/ratis/server/impl/LeaderElection.java | 3 ++- .../apache/ratis/server/impl/LeaderStateImpl.java | 7 ++++--- .../apache/ratis/server/impl/RaftServerImpl.java | 9 ++++++++- .../apache/ratis/server/impl/RaftServerProxy.java | 8 +++++++- .../apache/ratis/server/impl/ServerImplUtils.java | 10 +++++----- .../ratis/server/impl/StateMachineUpdater.java | 4 ++-- .../ratis/server/leader/LogAppenderDaemon.java | 3 ++- .../apache/ratis/server/impl/MiniRaftCluster.java | 2 +- 11 files changed, 60 insertions(+), 21 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java b/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java index 8e576a0ac..b3797fa7e 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java @@ -26,7 +26,7 @@ public class Daemon extends Thread { /** Construct a daemon thread with flexible arguments. */ protected Daemon(Builder builder) { - super(builder.runnable); + super(builder.threadGroup, builder.runnable); setName(builder.name); } @@ -38,6 +38,7 @@ public class Daemon extends Thread { public static class Builder { private String name; private Runnable runnable; + private ThreadGroup threadGroup; public Builder setName(String name) { this.name = name; @@ -49,6 +50,11 @@ public class Daemon extends Thread { return this; } + public Builder setThreadGroup(ThreadGroup threadGroup) { + this.threadGroup = threadGroup; + return this; + } + public Daemon build() { Objects.requireNonNull(name, "name == null"); return new Daemon(this); diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java index e9719b96c..2c01e7aaa 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java @@ -116,6 +116,9 @@ public interface RaftServer extends Closeable, RpcType.Get, /** @return the internal {@link RaftClient} of this division. */ RaftClient getRaftClient(); + /** @return the {@link ThreadGroup} the threads of this Division belong to. */ + ThreadGroup getThreadGroup(); + @Override void close(); } @@ -168,7 +171,7 @@ public interface RaftServer extends Closeable, RpcType.Get, private static Method initNewRaftServerMethod() { final String className = RaftServer.class.getPackage().getName() + ".impl.ServerImplUtils"; final Class<?>[] argClasses = {RaftPeerId.class, RaftGroup.class, StateMachine.Registry.class, - RaftProperties.class, Parameters.class}; + ThreadGroup.class, RaftProperties.class, Parameters.class}; try { final Class<?> clazz = ReflectionUtils.getClassByName(className); return clazz.getMethod("newRaftServer", argClasses); @@ -178,11 +181,11 @@ public interface RaftServer extends Closeable, RpcType.Get, } private static RaftServer newRaftServer(RaftPeerId serverId, RaftGroup group, - StateMachine.Registry stateMachineRegistry, RaftProperties properties, Parameters parameters) - throws IOException { + StateMachine.Registry stateMachineRegistry, ThreadGroup threadGroup, + RaftProperties properties, Parameters parameters) throws IOException { try { return (RaftServer) NEW_RAFT_SERVER_METHOD.invoke(null, - serverId, group, stateMachineRegistry, properties, parameters); + serverId, group, stateMachineRegistry, threadGroup, properties, parameters); } catch (IllegalAccessException e) { throw new IllegalStateException("Failed to build " + serverId, e); } catch (InvocationTargetException e) { @@ -195,6 +198,7 @@ public interface RaftServer extends Closeable, RpcType.Get, private RaftGroup group = null; private RaftProperties properties; private Parameters parameters; + private ThreadGroup threadGroup; /** @return a {@link RaftServer} object. */ public RaftServer build() throws IOException { @@ -203,6 +207,7 @@ public interface RaftServer extends Closeable, RpcType.Get, group, Objects.requireNonNull(stateMachineRegistry , "Neither 'stateMachine' nor 'setStateMachineRegistry' " + "is initialized."), + threadGroup, Objects.requireNonNull(properties, "The 'properties' field is not initialized."), parameters); } @@ -241,5 +246,15 @@ public interface RaftServer extends Closeable, RpcType.Get, this.parameters = parameters; return this; } + + /** + * Set {@link ThreadGroup} so the application can control RaftServer threads consistently with the application. + * For example, configure {@link ThreadGroup#uncaughtException(Thread, Throwable)} for the whole thread group. + * If not set, the new thread will be put into the thread group of the caller thread. + */ + public Builder setThreadGroup(ThreadGroup threadGroup) { + this.threadGroup = threadGroup; + return this; + } } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java index afad7c559..3911e39a5 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java @@ -64,7 +64,9 @@ class FollowerState extends Daemon { private final AtomicInteger outstandingOp = new AtomicInteger(); FollowerState(RaftServerImpl server, Object reason) { - super(newBuilder().setName(server.getMemberId() + "-" + JavaUtils.getClassSimpleName(FollowerState.class))); + super(newBuilder() + .setName(server.getMemberId() + "-" + JavaUtils.getClassSimpleName(FollowerState.class)) + .setThreadGroup(server.getThreadGroup())); this.server = server; this.reason = reason; } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java index cc7f623e3..ced72604a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -191,7 +191,8 @@ class LeaderElection implements Runnable { LeaderElection(RaftServerImpl server, boolean skipPreVote) { this.name = server.getMemberId() + "-" + JavaUtils.getClassSimpleName(getClass()) + COUNT.incrementAndGet(); this.lifeCycle = new LifeCycle(this); - this.daemon = Daemon.newBuilder().setName(name).setRunnable(this).build(); + this.daemon = Daemon.newBuilder().setName(name).setRunnable(this) + .setThreadGroup(server.getThreadGroup()).build(); this.server = server; this.skipPreVote = skipPreVote || !RaftServerConfigKeys.LeaderElection.preVote( diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index fe36a1a6a..e14653225 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -269,7 +269,7 @@ class LeaderStateImpl implements LeaderState { this.currentTerm = state.getCurrentTerm(); this.eventQueue = new EventQueue(); - processor = new EventProcessor(this.name); + processor = new EventProcessor(this.name, server); raftServerMetrics = server.getRaftServerMetrics(); logAppenderMetrics = new LogAppenderMetrics(server.getMemberId()); this.pendingRequests = new PendingRequests(server.getMemberId(), properties, raftServerMetrics); @@ -617,8 +617,9 @@ class LeaderStateImpl implements LeaderState { * state, such as changing to follower, or updating the committed index. */ private class EventProcessor extends Daemon { - public EventProcessor(String name) { - super(Daemon.newBuilder().setName(name)); + public EventProcessor(String name, RaftServerImpl server) { + super(Daemon.newBuilder() + .setName(name).setThreadGroup(server.getThreadGroup())); } @Override public void run() { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 1994fa8de..2aa14cbed 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -97,7 +97,7 @@ import org.apache.ratis.util.function.CheckedSupplier; class RaftServerImpl implements RaftServer.Division, RaftServerProtocol, RaftServerAsynchronousProtocol, - RaftClientProtocol, RaftClientAsynchronousProtocol{ + RaftClientProtocol, RaftClientAsynchronousProtocol { private static final String CLASS_NAME = JavaUtils.getClassSimpleName(RaftServerImpl.class); static final String REQUEST_VOTE = CLASS_NAME + ".requestVote"; static final String APPEND_ENTRIES = CLASS_NAME + ".appendEntries"; @@ -188,6 +188,7 @@ class RaftServerImpl implements RaftServer.Division, private final ExecutorService clientExecutor; private final AtomicBoolean firstElectionSinceStartup = new AtomicBoolean(true); + private final ThreadGroup threadGroup; RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy) throws IOException { final RaftPeerId id = proxy.getId(); @@ -213,6 +214,7 @@ class RaftServerImpl implements RaftServer.Division, getMemberId(), () -> commitInfoCache::get, retryCache::getStatistics); this.startComplete = new AtomicBoolean(false); + this.threadGroup = new ThreadGroup(proxy.getThreadGroup(), getMemberId().toString()); this.raftClient = JavaUtils.memoize(() -> RaftClient.newBuilder() .setRaftGroup(group) @@ -271,6 +273,11 @@ class RaftServerImpl implements RaftServer.Division, return sleepDeviationThreshold; } + @Override + public ThreadGroup getThreadGroup() { + return threadGroup; + } + @Override public StateMachine getStateMachine() { return stateMachine; diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java index b8cee7f53..bef72ee0d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java @@ -193,9 +193,10 @@ class RaftServerProxy implements RaftServer { private final ExecutorService executor; private final JvmPauseMonitor pauseMonitor; + private final ThreadGroup threadGroup; RaftServerProxy(RaftPeerId id, StateMachine.Registry stateMachineRegistry, - RaftProperties properties, Parameters parameters) { + RaftProperties properties, Parameters parameters, ThreadGroup threadGroup) { this.properties = properties; this.stateMachineRegistry = stateMachineRegistry; @@ -218,6 +219,7 @@ class RaftServerProxy implements RaftServer { final TimeDuration leaderStepDownWaitTime = RaftServerConfigKeys.LeaderElection.leaderStepDownWaitTime(properties); this.pauseMonitor = new JvmPauseMonitor(id, extraSleep -> handleJvmPause(extraSleep, rpcSlownessTimeout, leaderStepDownWaitTime)); + this.threadGroup = threadGroup == null ? new ThreadGroup(this.id.toString()) : threadGroup; } private void handleJvmPause(TimeDuration extraSleep, TimeDuration closeThreshold, TimeDuration stepDownThreshold) @@ -376,6 +378,10 @@ class RaftServerProxy implements RaftServer { return lifeCycle.getCurrentState(); } + ThreadGroup getThreadGroup() { + return threadGroup; + } + @Override public void start() throws IOException { lifeCycle.startAndTransition(this::startImpl, IOException.class); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java index 6777b9093..6a29b4cbb 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java @@ -46,26 +46,26 @@ public final class ServerImplUtils { /** Create a {@link RaftServerProxy}. */ public static RaftServerProxy newRaftServer( RaftPeerId id, RaftGroup group, StateMachine.Registry stateMachineRegistry, - RaftProperties properties, Parameters parameters) throws IOException { + ThreadGroup threadGroup, RaftProperties properties, Parameters parameters) throws IOException { RaftServer.LOG.debug("newRaftServer: {}, {}", id, group); if (group != null && !group.getPeers().isEmpty()) { Preconditions.assertNotNull(id, "RaftPeerId %s is not in RaftGroup %s", id, group); Preconditions.assertNotNull(group.getPeer(id), "RaftPeerId %s is not in RaftGroup %s", id, group); } - final RaftServerProxy proxy = newRaftServer(id, stateMachineRegistry, properties, parameters); + final RaftServerProxy proxy = newRaftServer(id, stateMachineRegistry, threadGroup, properties, parameters); proxy.initGroups(group); return proxy; } private static RaftServerProxy newRaftServer( - RaftPeerId id, StateMachine.Registry stateMachineRegistry, RaftProperties properties, Parameters parameters) - throws IOException { + RaftPeerId id, StateMachine.Registry stateMachineRegistry, ThreadGroup threadGroup, RaftProperties properties, + Parameters parameters) throws IOException { final TimeDuration sleepTime = TimeDuration.valueOf(500, TimeUnit.MILLISECONDS); final RaftServerProxy proxy; try { // attempt multiple times to avoid temporary bind exception proxy = JavaUtils.attemptRepeatedly( - () -> new RaftServerProxy(id, stateMachineRegistry, properties, parameters), + () -> new RaftServerProxy(id, stateMachineRegistry, properties, parameters, threadGroup), 5, sleepTime, "new RaftServerProxy", RaftServer.LOG); } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java index 321d1d71a..bd62b65ac 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java @@ -109,8 +109,8 @@ class StateMachineUpdater implements Runnable { } }; this.purgeUptoSnapshotIndex = RaftServerConfigKeys.Log.purgeUptoSnapshotIndex(properties); - - updater = Daemon.newBuilder().setName(name).setRunnable(this).build(); + updater = Daemon.newBuilder().setName(name).setRunnable(this) + .setThreadGroup(server.getThreadGroup()).build(); this.awaitForSignal = new AwaitForSignal(name); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java index d1688987d..d985a6ae8 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java @@ -48,7 +48,8 @@ class LogAppenderDaemon { this.logAppender = logAppender; this.name = logAppender + "-" + JavaUtils.getClassSimpleName(getClass()); this.lifeCycle = new LifeCycle(name); - this.daemon = Daemon.newBuilder().setName(name).setRunnable(this::run).build(); + this.daemon = Daemon.newBuilder().setName(name).setRunnable(this::run) + .setThreadGroup(logAppender.getServer().getThreadGroup()).build(); } public boolean isWorking() { diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java index 64638a2e8..ec2c285e5 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java @@ -387,7 +387,7 @@ public abstract class MiniRaftCluster implements Closeable { } final RaftProperties prop = new RaftProperties(properties); RaftServerConfigKeys.setStorageDir(prop, Collections.singletonList(dir)); - return ServerImplUtils.newRaftServer(id, group, getStateMachineRegistry(prop), prop, + return ServerImplUtils.newRaftServer(id, group, getStateMachineRegistry(prop), null, prop, setPropertiesAndInitParameters(id, group, prop)); } catch (IOException e) { throw new RuntimeException(e);
