This is an automated email from the ASF dual-hosted git repository. dragonyliu pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit c596fd3383f9a74645b3467a462519488b842156 Author: Jiacheng Liu <[email protected]> AuthorDate: Wed Sep 14 19:30:44 2022 +0800 RATIS-1695. Use a Builder for Daemon (#747) (cherry picked from commit a04bf69cacec68e81c39a4f5db92e9eb73dd0cb7) --- .../main/java/org/apache/ratis/util/Daemon.java | 37 ++++++++++++++++------ .../org/apache/ratis/util/JvmPauseMonitor.java | 5 ++- .../org/apache/ratis/util/TimeoutScheduler.java | 6 ++-- .../apache/ratis/server/impl/FollowerState.java | 6 ++-- .../apache/ratis/server/impl/LeaderElection.java | 5 +-- .../apache/ratis/server/impl/LeaderStateImpl.java | 2 +- .../ratis/server/impl/StateMachineUpdater.java | 2 +- .../ratis/server/leader/LogAppenderDaemon.java | 2 +- .../apache/ratis/server/impl/MiniRaftCluster.java | 5 ++- .../ratis/server/simulation/RequestHandler.java | 1 + .../server/simulation/SimulatedServerRpc.java | 5 ++- .../statemachine/SimpleStateMachine4Testing.java | 4 +-- 12 files changed, 54 insertions(+), 26 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java b/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java index b31185474..8e576a0ac 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java @@ -17,24 +17,41 @@ */ package org.apache.ratis.util; +import java.util.Objects; + public class Daemon extends Thread { { setDaemon(true); } - /** Construct a daemon thread. */ - public Daemon() { - super(); + /** Construct a daemon thread with flexible arguments. */ + protected Daemon(Builder builder) { + super(builder.runnable); + setName(builder.name); } - /** Construct a daemon thread with the given runnable. */ - public Daemon(Runnable runnable) { - this(runnable, runnable.toString()); + /** @return a {@link Builder}. */ + public static Builder newBuilder() { + return new Builder(); } - /** Construct a daemon thread with the given runnable. */ - public Daemon(Runnable runnable, String name) { - super(runnable); - this.setName(name); + public static class Builder { + private String name; + private Runnable runnable; + + public Builder setName(String name) { + this.name = name; + return this; + } + + public Builder setRunnable(Runnable runnable) { + this.runnable = runnable; + return this; + } + + public Daemon build() { + Objects.requireNonNull(name, "name == null"); + return new Daemon(this); + } } } diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java b/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java index aa89bde0c..1fcfc4d6a 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java @@ -28,11 +28,13 @@ import java.lang.management.MemoryManagerMXBean; import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; public class JvmPauseMonitor { public static final Logger LOG = LoggerFactory.getLogger(JvmPauseMonitor.class); + private static final AtomicInteger THREAD_COUNT = new AtomicInteger(0); static final class GcInfo { private final long count; @@ -137,7 +139,8 @@ public class JvmPauseMonitor { /** Start this monitor. */ public void start() { - final MemoizedSupplier<Thread> supplier = JavaUtils.memoize(() -> new Daemon(this::run)); + final MemoizedSupplier<Thread> supplier = JavaUtils.memoize(() -> Daemon.newBuilder() + .setName("JvmPauseMonitor" + THREAD_COUNT.getAndIncrement()).setRunnable(this::run).build()); Optional.of(threadRef.updateAndGet(previous -> Optional.ofNullable(previous).orElseGet(supplier))) .filter(t -> supplier.isInitialized()) .ifPresent(Thread::start); diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java index cba2851f4..d6be6c0ec 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java @@ -25,8 +25,8 @@ import java.util.Collection; import java.util.Optional; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Supplier; @@ -37,6 +37,7 @@ public final class TimeoutScheduler implements TimeoutExecutor { static final TimeDuration DEFAULT_GRACE_PERIOD = TimeDuration.valueOf(1, TimeUnit.MINUTES); private static final Supplier<TimeoutScheduler> INSTANCE = JavaUtils.memoize(TimeoutScheduler::new); + private static final AtomicInteger THREAD_COUNT = new AtomicInteger(0); public static TimeoutScheduler getInstance() { return INSTANCE.get(); @@ -84,7 +85,8 @@ public final class TimeoutScheduler implements TimeoutExecutor { private static ScheduledThreadPoolExecutor newExecutor() { LOG.debug("new ScheduledThreadPoolExecutor"); - final ScheduledThreadPoolExecutor e = new ScheduledThreadPoolExecutor(1, (ThreadFactory) Daemon::new); + final ScheduledThreadPoolExecutor e = new ScheduledThreadPoolExecutor(1, (runnable) -> Daemon.newBuilder() + .setName("TimeoutScheduler-" + THREAD_COUNT.getAndIncrement()).setRunnable(runnable).build()); e.setRemoveOnCancelPolicy(true); return e; } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java index 52ae033f5..afad7c559 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java @@ -55,7 +55,6 @@ class FollowerState extends Daemon { static final Logger LOG = LoggerFactory.getLogger(FollowerState.class); - private final String name; private final Object reason; private final RaftServerImpl server; @@ -65,8 +64,7 @@ class FollowerState extends Daemon { private final AtomicInteger outstandingOp = new AtomicInteger(); FollowerState(RaftServerImpl server, Object reason) { - this.name = server.getMemberId() + "-" + JavaUtils.getClassSimpleName(getClass()); - this.setName(this.name); + super(newBuilder().setName(server.getMemberId() + "-" + JavaUtils.getClassSimpleName(FollowerState.class))); this.server = server; this.reason = reason; } @@ -161,6 +159,6 @@ class FollowerState extends Daemon { @Override public String toString() { - return name; + return getName(); } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java index 5ed18975e..cc7f623e3 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -138,7 +138,8 @@ class LeaderElection implements Runnable { Executor(Object name, int size) { Preconditions.assertTrue(size > 0); - executor = Executors.newFixedThreadPool(size, r -> new Daemon(r, name + "-" + count.incrementAndGet())); + executor = Executors.newFixedThreadPool(size, r -> + Daemon.newBuilder().setName(name + "-" + count.incrementAndGet()).setRunnable(r).build()); service = new ExecutorCompletionService<>(executor); } @@ -190,7 +191,7 @@ class LeaderElection implements Runnable { LeaderElection(RaftServerImpl server, boolean skipPreVote) { this.name = server.getMemberId() + "-" + JavaUtils.getClassSimpleName(getClass()) + COUNT.incrementAndGet(); this.lifeCycle = new LifeCycle(this); - this.daemon = new Daemon(this); + this.daemon = Daemon.newBuilder().setName(name).setRunnable(this).build(); this.server = server; this.skipPreVote = skipPreVote || !RaftServerConfigKeys.LeaderElection.preVote( diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index fbcbce448..fe36a1a6a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -618,7 +618,7 @@ class LeaderStateImpl implements LeaderState { */ private class EventProcessor extends Daemon { public EventProcessor(String name) { - setName(name); + super(Daemon.newBuilder().setName(name)); } @Override public void run() { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java index 87aac06b2..321d1d71a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java @@ -110,7 +110,7 @@ class StateMachineUpdater implements Runnable { }; this.purgeUptoSnapshotIndex = RaftServerConfigKeys.Log.purgeUptoSnapshotIndex(properties); - updater = new Daemon(this); + updater = Daemon.newBuilder().setName(name).setRunnable(this).build(); this.awaitForSignal = new AwaitForSignal(name); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java index 6b2d60796..d1688987d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java @@ -48,7 +48,7 @@ class LogAppenderDaemon { this.logAppender = logAppender; this.name = logAppender + "-" + JavaUtils.getClassSimpleName(getClass()); this.lifeCycle = new LifeCycle(name); - this.daemon = new Daemon(this::run, name); + this.daemon = Daemon.newBuilder().setName(name).setRunnable(this::run).build(); } public boolean isWorking() { diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java index 1f4047524..64638a2e8 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java @@ -73,6 +73,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; @@ -89,6 +90,7 @@ public abstract class MiniRaftCluster implements Closeable { private static final StateMachine.Registry STATEMACHINE_REGISTRY_DEFAULT = gid -> new BaseStateMachine(); private static final TimeDuration RETRY_INTERVAL_DEFAULT = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS); + static final AtomicInteger THREAD_COUNT = new AtomicInteger(0); public static abstract class Factory<CLUSTER extends MiniRaftCluster> { public interface Get<CLUSTER extends MiniRaftCluster> { @@ -834,7 +836,8 @@ public abstract class MiniRaftCluster implements Closeable { // TODO: classes like RaftLog may throw uncaught exception during shutdown (e.g. write after close) ExitUtils.setTerminateOnUncaughtException(false); - final ExecutorService executor = Executors.newFixedThreadPool(servers.size(), Daemon::new); + final ExecutorService executor = Executors.newFixedThreadPool(servers.size(), (t) -> + Daemon.newBuilder().setName("MiniRaftCluster-" + THREAD_COUNT.incrementAndGet()).setRunnable(t).build()); getServers().forEach(proxy -> executor.submit(() -> JavaUtils.runAsUnchecked(proxy::close))); try { executor.shutdown(); diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java index 10382f0a4..6a5c9c881 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java @@ -106,6 +106,7 @@ public class RequestHandler<REQUEST extends RaftRpcMessage, private final int id; HandlerDaemon(int id) { + super(newBuilder().setName("HandlerDaemon-" + id)); this.id = id; } diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java index 91905d599..863432d9c 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java @@ -51,15 +51,18 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; class SimulatedServerRpc implements RaftServerRpc { static final Logger LOG = LoggerFactory.getLogger(SimulatedServerRpc.class); + static final AtomicInteger THREAD_COUNT = new AtomicInteger(0); private final RaftServer server; private final RequestHandler<RaftServerRequest, RaftServerReply> serverHandler; private final RequestHandler<RaftClientRequest, RaftClientReply> clientHandler; - private final ExecutorService executor = Executors.newFixedThreadPool(3, Daemon::new); + private final ExecutorService executor = Executors.newFixedThreadPool(3, (t) -> + Daemon.newBuilder().setName("SimulatedServerRpc-" + THREAD_COUNT.incrementAndGet()).setRunnable(t).build()); SimulatedServerRpc(RaftServer server, SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply, diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java index 7b5527090..c8755abab 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java @@ -169,7 +169,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { private RaftGroupId groupId; public SimpleStateMachine4Testing() { - checkpointer = new Daemon(() -> { + checkpointer = Daemon.newBuilder().setName("SimpleStateMachine4Testing").setRunnable(() -> { while (running) { if (indexMap.lastKey() - endIndexLastCkpt >= SNAPSHOT_THRESHOLD) { endIndexLastCkpt = takeSnapshot(); @@ -181,7 +181,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { Thread.currentThread().interrupt(); } } - }); + }).build(); } public Collecting collecting() {
