This is an automated email from the ASF dual-hosted git repository.
ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new c76ab77 RATIS-718. TimeoutScheduler throws IllegalStateException.
Contributed by Tsz Wo Nicholas Sze.
c76ab77 is described below
commit c76ab77da3c501cf51dd0c21d9b66ca484556ab4
Author: Lokesh Jain <[email protected]>
AuthorDate: Tue Oct 22 13:11:24 2019 +0530
RATIS-718. TimeoutScheduler throws IllegalStateException. Contributed by
Tsz Wo Nicholas Sze.
---
.../apache/ratis/client/impl/RaftClientImpl.java | 2 +-
.../org/apache/ratis/util/TimeoutScheduler.java | 89 ++++++++++++----------
.../grpc/client/GrpcClientProtocolClient.java | 2 +-
.../apache/ratis/grpc/server/GrpcLogAppender.java | 2 +-
.../apache/ratis/server/impl/WatchRequests.java | 2 +-
.../apache/ratis/util/TestTimeoutScheduler.java | 19 +++--
6 files changed, 63 insertions(+), 53 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index b005cea..582b6b6 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -96,7 +96,7 @@ final class RaftClientImpl implements RaftClient {
Preconditions.assertTrue(retryPolicy != null, "retry policy can't be
null");
this.retryPolicy = retryPolicy;
- scheduler = TimeoutScheduler.newInstance(0);
+ scheduler = TimeoutScheduler.getInstance();
clientRpc.addServers(peers);
this.orderedAsync = JavaUtils.memoize(() -> new OrderedAsync(this,
properties));
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
index 01ccf09..4af13f1 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
@@ -24,7 +24,6 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.util.Collection;
import java.util.Optional;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
@@ -38,11 +37,17 @@ public final class TimeoutScheduler implements Closeable {
static final TimeDuration DEFAULT_GRACE_PERIOD = TimeDuration.valueOf(1,
TimeUnit.MINUTES);
- public static TimeoutScheduler newInstance(int numThreads) {
- return new TimeoutScheduler(numThreads);
+ private static final Supplier<TimeoutScheduler> INSTANCE =
JavaUtils.memoize(TimeoutScheduler::new);
+
+ public static TimeoutScheduler getInstance() {
+ return INSTANCE.get();
+ }
+
+ static TimeoutScheduler newInstance() {
+ return new TimeoutScheduler();
}
- class ShutdownTask implements Runnable {
+ static class ShutdownTask {
private final int sid;
private final ScheduledFuture<?> future;
@@ -55,16 +60,41 @@ public final class TimeoutScheduler implements Closeable {
return sid;
}
- @Override
- public void run() {
- tryShutdownScheduler(sid);
- }
-
void cancel() {
future.cancel(false);
}
}
+ private static class Scheduler {
+ private final AtomicReference<ScheduledThreadPoolExecutor> executor = new
AtomicReference<>();
+
+ boolean hasExecutor() {
+ return executor.get() != null;
+ }
+
+ int getQueueSize() {
+ return Optional.ofNullable(executor.get())
+ .map(ScheduledThreadPoolExecutor::getQueue)
+ .map(Collection::size).orElse(0);
+ }
+
+ ScheduledFuture<?> schedule(Runnable task, Supplier<String> name,
TimeDuration time) {
+ return executor.updateAndGet(e ->
Optional.ofNullable(e).orElseGet(Scheduler::newExecutor))
+ .schedule(LogUtils.newRunnable(LOG, task, name), time.getDuration(),
time.getUnit());
+ }
+
+ private static ScheduledThreadPoolExecutor newExecutor() {
+ LOG.debug("new ScheduledThreadPoolExecutor");
+ final ScheduledThreadPoolExecutor e = new ScheduledThreadPoolExecutor(0,
(ThreadFactory) Daemon::new);
+ e.setRemoveOnCancelPolicy(true);
+ return e;
+ }
+
+ void shutdown() {
+
Optional.ofNullable(executor.getAndSet(null)).ifPresent(ScheduledThreadPoolExecutor::shutdown);
+ }
+ }
+
/** When there is no tasks, the time period to wait before shutting down the
scheduler. */
private final AtomicReference<TimeDuration> gracePeriod = new
AtomicReference<>(DEFAULT_GRACE_PERIOD);
@@ -75,15 +105,13 @@ public final class TimeoutScheduler implements Closeable {
private ShutdownTask shutdownTask = null;
- private final int numThreads;
- private volatile ScheduledThreadPoolExecutor scheduler = null;
+ private final Scheduler scheduler = new Scheduler();
- private TimeoutScheduler(int numThreads) {
- this.numThreads = numThreads;
+ private TimeoutScheduler() {
}
int getQueueSize() {
- return
Optional.ofNullable(scheduler).map(ScheduledThreadPoolExecutor::getQueue).map(Collection::size).orElse(0);
+ return scheduler.getQueueSize();
}
TimeDuration getGracePeriod() {
@@ -94,8 +122,8 @@ public final class TimeoutScheduler implements Closeable {
this.gracePeriod.set(gracePeriod);
}
- synchronized boolean hasScheduler() {
- return scheduler != null;
+ boolean hasScheduler() {
+ return scheduler.hasExecutor();
}
/**
@@ -120,17 +148,11 @@ public final class TimeoutScheduler implements Closeable {
}
private synchronized void onTimeout(TimeDuration timeout, Consumer<Integer>
toSchedule) {
- if (scheduler == null) {
- Preconditions.assertTrue(numTasks == 0);
- LOG.debug("Initialize scheduler");
- scheduler = new ScheduledThreadPoolExecutor(numThreads, (ThreadFactory)
Daemon::new);
- scheduler.setRemoveOnCancelPolicy(true);
- }
numTasks++;
final int sid = scheduleID++;
LOG.debug("schedule a task: timeout {}, sid {}", timeout, sid);
- schedule(scheduler, () -> toSchedule.accept(sid), () -> "task #" + sid,
timeout);
+ scheduler.schedule(() -> toSchedule.accept(sid), () -> "task #" + sid,
timeout);
}
private synchronized void onTaskCompleted() {
@@ -149,27 +171,16 @@ public final class TimeoutScheduler implements Closeable {
final TimeDuration grace = getGracePeriod();
LOG.debug("Schedule a shutdown task: grace {}, sid {}", grace, sid);
- if (scheduler == null) {
- shutdownTask = null;
- return;
- }
- final ScheduledFuture<?> future = schedule(scheduler, () ->
tryShutdownScheduler(sid),
+ final ScheduledFuture<?> future = scheduler.schedule(() ->
tryShutdownScheduler(sid),
() -> "shutdown task #" + sid, grace);
shutdownTask = new ShutdownTask(sid, future);
}
- private static ScheduledFuture<?> schedule(
- ScheduledExecutorService service, Runnable task, Supplier<String> name,
TimeDuration timeDuration) {
- return service.schedule(LogUtils.newRunnable(LOG, task, name),
- timeDuration.getDuration(), timeDuration.getUnit());
- }
-
private synchronized void tryShutdownScheduler(int sid) {
- if (sid == scheduleID && scheduler != null) {
+ if (sid == scheduleID) {
// No new tasks submitted, shutdown the scheduler.
LOG.debug("shutdown scheduler: sid {}", sid);
scheduler.shutdown();
- scheduler = null;
} else {
LOG.debug("shutdown cancelled: scheduleID has changed from {} to {}",
sid, scheduleID);
}
@@ -182,10 +193,6 @@ public final class TimeoutScheduler implements Closeable {
@Override
public synchronized void close() {
- if (scheduler != null) {
- LOG.debug("Closing ThreadPool");
- scheduler.shutdownNow();
- scheduler = null;
- }
+ tryShutdownScheduler(scheduleID);
}
}
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index 6e34433..d6b9e13 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -80,7 +80,7 @@ public class GrpcClientProtocolClient implements Closeable {
private final ManagedChannel channel;
private final TimeDuration requestTimeoutDuration;
- private final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(0);
+ private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
private final RaftClientProtocolServiceBlockingStub blockingStub;
private final RaftClientProtocolServiceStub asyncStub;
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 6374c31..4e1e865 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -58,7 +58,7 @@ public class GrpcLogAppender extends LogAppender {
private final boolean installSnapshotEnabled;
private final TimeDuration requestTimeoutDuration;
- private final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(1);
+ private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
private volatile StreamObserver<AppendEntriesRequestProto>
appendLogRequestObserver;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
index 266e5cd..f7f76eb 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
@@ -168,7 +168,7 @@ class WatchRequests {
private final TimeDuration watchTimeoutNanos;
private final TimeDuration watchTimeoutDenominationNanos;
- private final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(1);
+ private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
WatchRequests(Object name, RaftProperties properties) {
this.name = name + "-" + getClass().getSimpleName();
diff --git
a/ratis-test/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java
b/ratis-test/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java
index fce230a..54526fe 100644
--- a/ratis-test/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java
@@ -47,7 +47,7 @@ public class TestTimeoutScheduler extends BaseTest {
@Test(timeout = 1000)
public void testSingleTask() throws Exception {
- final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(1);
+ final TimeoutScheduler scheduler = TimeoutScheduler.newInstance();
final TimeDuration grace = TimeDuration.valueOf(100,
TimeUnit.MILLISECONDS);
scheduler.setGracePeriod(grace);
Assert.assertFalse(scheduler.hasScheduler());
@@ -78,11 +78,12 @@ public class TestTimeoutScheduler extends BaseTest {
Assert.assertFalse(scheduler.hasScheduler());
errorHandler.assertNoError();
+ scheduler.setGracePeriod(grace);
}
@Test(timeout = 1000)
public void testMultipleTasks() throws Exception {
- final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(1);
+ final TimeoutScheduler scheduler = TimeoutScheduler.newInstance();
final TimeDuration grace = TimeDuration.valueOf(100,
TimeUnit.MILLISECONDS);
scheduler.setGracePeriod(grace);
Assert.assertFalse(scheduler.hasScheduler());
@@ -128,7 +129,7 @@ public class TestTimeoutScheduler extends BaseTest {
@Test(timeout = 1000)
public void testExtendingGracePeriod() throws Exception {
- final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(1);
+ final TimeoutScheduler scheduler = TimeoutScheduler.newInstance();
final TimeDuration grace = TimeDuration.valueOf(100,
TimeUnit.MILLISECONDS);
scheduler.setGracePeriod(grace);
Assert.assertFalse(scheduler.hasScheduler());
@@ -178,7 +179,7 @@ public class TestTimeoutScheduler extends BaseTest {
@Test(timeout = 1000)
public void testRestartingScheduler() throws Exception {
- final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(1);
+ final TimeoutScheduler scheduler = TimeoutScheduler.newInstance();
final TimeDuration grace = TimeDuration.valueOf(100,
TimeUnit.MILLISECONDS);
scheduler.setGracePeriod(grace);
Assert.assertFalse(scheduler.hasScheduler());
@@ -209,9 +210,9 @@ public class TestTimeoutScheduler extends BaseTest {
errorHandler.assertNoError();
}
- @Test(timeout = 1000)
+ @Test(timeout = 10_000)
public void testShutdown() throws Exception {
- final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(0);
+ final TimeoutScheduler scheduler = TimeoutScheduler.newInstance();
Assert.assertEquals(TimeoutScheduler.DEFAULT_GRACE_PERIOD,
scheduler.getGracePeriod());
final ErrorHandler errorHandler = new ErrorHandler();
@@ -222,7 +223,8 @@ public class TestTimeoutScheduler extends BaseTest {
}
HUNDRED_MILLIS.sleep();
HUNDRED_MILLIS.sleep();
- Assert.assertEquals(1, scheduler.getQueueSize()); // only 1 shutdown task
is scheduled
+ JavaUtils.attempt(() -> Assert.assertEquals(1, scheduler.getQueueSize()),
+ 10, HUNDRED_MILLIS, "only 1 shutdown task is scheduled", LOG);
final TimeDuration oneMillis = TimeDuration.valueOf(1,
TimeUnit.MILLISECONDS);
for(int i = 0; i < numTasks; i++) {
@@ -232,7 +234,8 @@ public class TestTimeoutScheduler extends BaseTest {
oneMillis.sleep();
}
HUNDRED_MILLIS.sleep();
- Assert.assertEquals(1, scheduler.getQueueSize()); // only 1 shutdown task
is scheduled
+ JavaUtils.attempt(() -> Assert.assertEquals(1, scheduler.getQueueSize()),
+ 10, HUNDRED_MILLIS, "only 1 shutdown task is scheduled", LOG);
errorHandler.assertNoError();
}