This is an automated email from the ASF dual-hosted git repository.

ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new c76ab77  RATIS-718. TimeoutScheduler throws IllegalStateException. 
Contributed by Tsz Wo Nicholas Sze.
c76ab77 is described below

commit c76ab77da3c501cf51dd0c21d9b66ca484556ab4
Author: Lokesh Jain <[email protected]>
AuthorDate: Tue Oct 22 13:11:24 2019 +0530

    RATIS-718. TimeoutScheduler throws IllegalStateException. Contributed by 
Tsz Wo Nicholas Sze.
---
 .../apache/ratis/client/impl/RaftClientImpl.java   |  2 +-
 .../org/apache/ratis/util/TimeoutScheduler.java    | 89 ++++++++++++----------
 .../grpc/client/GrpcClientProtocolClient.java      |  2 +-
 .../apache/ratis/grpc/server/GrpcLogAppender.java  |  2 +-
 .../apache/ratis/server/impl/WatchRequests.java    |  2 +-
 .../apache/ratis/util/TestTimeoutScheduler.java    | 19 +++--
 6 files changed, 63 insertions(+), 53 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index b005cea..582b6b6 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -96,7 +96,7 @@ final class RaftClientImpl implements RaftClient {
     Preconditions.assertTrue(retryPolicy != null, "retry policy can't be 
null");
     this.retryPolicy = retryPolicy;
 
-    scheduler = TimeoutScheduler.newInstance(0);
+    scheduler = TimeoutScheduler.getInstance();
     clientRpc.addServers(peers);
 
     this.orderedAsync = JavaUtils.memoize(() -> new OrderedAsync(this, 
properties));
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java 
b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
index 01ccf09..4af13f1 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
@@ -24,7 +24,6 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.util.Collection;
 import java.util.Optional;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
@@ -38,11 +37,17 @@ public final class TimeoutScheduler implements Closeable {
 
   static final TimeDuration DEFAULT_GRACE_PERIOD = TimeDuration.valueOf(1, 
TimeUnit.MINUTES);
 
-  public static TimeoutScheduler newInstance(int numThreads) {
-    return new TimeoutScheduler(numThreads);
+  private static final Supplier<TimeoutScheduler> INSTANCE = 
JavaUtils.memoize(TimeoutScheduler::new);
+
+  public static TimeoutScheduler getInstance() {
+    return INSTANCE.get();
+  }
+
+  static TimeoutScheduler newInstance() {
+    return new TimeoutScheduler();
   }
 
-  class ShutdownTask implements Runnable {
+  static class ShutdownTask {
     private final int sid;
     private final ScheduledFuture<?> future;
 
@@ -55,16 +60,41 @@ public final class TimeoutScheduler implements Closeable {
       return sid;
     }
 
-    @Override
-    public void run() {
-      tryShutdownScheduler(sid);
-    }
-
     void cancel() {
       future.cancel(false);
     }
   }
 
+  private static class Scheduler {
+    private final AtomicReference<ScheduledThreadPoolExecutor> executor = new 
AtomicReference<>();
+
+    boolean hasExecutor() {
+      return executor.get() != null;
+    }
+
+    int getQueueSize() {
+      return Optional.ofNullable(executor.get())
+          .map(ScheduledThreadPoolExecutor::getQueue)
+          .map(Collection::size).orElse(0);
+    }
+
+    ScheduledFuture<?> schedule(Runnable task, Supplier<String> name, 
TimeDuration time) {
+      return executor.updateAndGet(e -> 
Optional.ofNullable(e).orElseGet(Scheduler::newExecutor))
+          .schedule(LogUtils.newRunnable(LOG, task, name), time.getDuration(), 
time.getUnit());
+    }
+
+    private static ScheduledThreadPoolExecutor newExecutor() {
+      LOG.debug("new ScheduledThreadPoolExecutor");
+      final ScheduledThreadPoolExecutor e = new ScheduledThreadPoolExecutor(0, 
(ThreadFactory) Daemon::new);
+      e.setRemoveOnCancelPolicy(true);
+      return e;
+    }
+
+    void shutdown() {
+      
Optional.ofNullable(executor.getAndSet(null)).ifPresent(ScheduledThreadPoolExecutor::shutdown);
+    }
+  }
+
   /** When there is no tasks, the time period to wait before shutting down the 
scheduler. */
   private final AtomicReference<TimeDuration> gracePeriod = new 
AtomicReference<>(DEFAULT_GRACE_PERIOD);
 
@@ -75,15 +105,13 @@ public final class TimeoutScheduler implements Closeable {
 
   private ShutdownTask shutdownTask = null;
 
-  private final int numThreads;
-  private volatile ScheduledThreadPoolExecutor scheduler = null;
+  private final Scheduler scheduler = new Scheduler();
 
-  private TimeoutScheduler(int numThreads) {
-    this.numThreads = numThreads;
+  private TimeoutScheduler() {
   }
 
   int getQueueSize() {
-    return 
Optional.ofNullable(scheduler).map(ScheduledThreadPoolExecutor::getQueue).map(Collection::size).orElse(0);
+    return scheduler.getQueueSize();
   }
 
   TimeDuration getGracePeriod() {
@@ -94,8 +122,8 @@ public final class TimeoutScheduler implements Closeable {
     this.gracePeriod.set(gracePeriod);
   }
 
-  synchronized boolean hasScheduler() {
-    return scheduler != null;
+  boolean hasScheduler() {
+    return scheduler.hasExecutor();
   }
 
   /**
@@ -120,17 +148,11 @@ public final class TimeoutScheduler implements Closeable {
   }
 
   private synchronized void onTimeout(TimeDuration timeout, Consumer<Integer> 
toSchedule) {
-    if (scheduler == null) {
-      Preconditions.assertTrue(numTasks == 0);
-      LOG.debug("Initialize scheduler");
-      scheduler = new ScheduledThreadPoolExecutor(numThreads, (ThreadFactory) 
Daemon::new);
-      scheduler.setRemoveOnCancelPolicy(true);
-    }
     numTasks++;
     final int sid = scheduleID++;
 
     LOG.debug("schedule a task: timeout {}, sid {}", timeout, sid);
-    schedule(scheduler, () -> toSchedule.accept(sid), () -> "task #" + sid, 
timeout);
+    scheduler.schedule(() -> toSchedule.accept(sid), () -> "task #" + sid, 
timeout);
   }
 
   private synchronized void onTaskCompleted() {
@@ -149,27 +171,16 @@ public final class TimeoutScheduler implements Closeable {
 
     final TimeDuration grace = getGracePeriod();
     LOG.debug("Schedule a shutdown task: grace {}, sid {}", grace, sid);
-    if (scheduler == null) {
-      shutdownTask = null;
-      return;
-    }
-    final ScheduledFuture<?> future = schedule(scheduler, () -> 
tryShutdownScheduler(sid),
+    final ScheduledFuture<?> future = scheduler.schedule(() -> 
tryShutdownScheduler(sid),
         () -> "shutdown task #" + sid, grace);
     shutdownTask = new ShutdownTask(sid, future);
   }
 
-  private static ScheduledFuture<?> schedule(
-      ScheduledExecutorService service, Runnable task, Supplier<String> name, 
TimeDuration timeDuration) {
-    return service.schedule(LogUtils.newRunnable(LOG, task, name),
-        timeDuration.getDuration(), timeDuration.getUnit());
-  }
-
   private synchronized void tryShutdownScheduler(int sid) {
-    if (sid == scheduleID && scheduler != null) {
+    if (sid == scheduleID) {
       // No new tasks submitted, shutdown the scheduler.
       LOG.debug("shutdown scheduler: sid {}", sid);
       scheduler.shutdown();
-      scheduler = null;
     } else {
       LOG.debug("shutdown cancelled: scheduleID has changed from {} to {}", 
sid, scheduleID);
     }
@@ -182,10 +193,6 @@ public final class TimeoutScheduler implements Closeable {
 
   @Override
   public synchronized void close() {
-    if (scheduler != null) {
-      LOG.debug("Closing ThreadPool");
-      scheduler.shutdownNow();
-      scheduler = null;
-    }
+    tryShutdownScheduler(scheduleID);
   }
 }
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index 6e34433..d6b9e13 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -80,7 +80,7 @@ public class GrpcClientProtocolClient implements Closeable {
   private final ManagedChannel channel;
 
   private final TimeDuration requestTimeoutDuration;
-  private final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(0);
+  private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
 
   private final RaftClientProtocolServiceBlockingStub blockingStub;
   private final RaftClientProtocolServiceStub asyncStub;
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 6374c31..4e1e865 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -58,7 +58,7 @@ public class GrpcLogAppender extends LogAppender {
   private final boolean installSnapshotEnabled;
 
   private final TimeDuration requestTimeoutDuration;
-  private final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(1);
+  private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
 
   private volatile StreamObserver<AppendEntriesRequestProto> 
appendLogRequestObserver;
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
index 266e5cd..f7f76eb 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
@@ -168,7 +168,7 @@ class WatchRequests {
 
   private final TimeDuration watchTimeoutNanos;
   private final TimeDuration watchTimeoutDenominationNanos;
-  private final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(1);
+  private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
 
   WatchRequests(Object name, RaftProperties properties) {
     this.name = name + "-" + getClass().getSimpleName();
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java 
b/ratis-test/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java
index fce230a..54526fe 100644
--- a/ratis-test/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java
@@ -47,7 +47,7 @@ public class TestTimeoutScheduler extends BaseTest {
 
   @Test(timeout = 1000)
   public void testSingleTask() throws Exception {
-    final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(1);
+    final TimeoutScheduler scheduler = TimeoutScheduler.newInstance();
     final TimeDuration grace = TimeDuration.valueOf(100, 
TimeUnit.MILLISECONDS);
     scheduler.setGracePeriod(grace);
     Assert.assertFalse(scheduler.hasScheduler());
@@ -78,11 +78,12 @@ public class TestTimeoutScheduler extends BaseTest {
     Assert.assertFalse(scheduler.hasScheduler());
 
     errorHandler.assertNoError();
+    scheduler.setGracePeriod(grace);
   }
 
   @Test(timeout = 1000)
   public void testMultipleTasks() throws Exception {
-    final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(1);
+    final TimeoutScheduler scheduler = TimeoutScheduler.newInstance();
     final TimeDuration grace = TimeDuration.valueOf(100, 
TimeUnit.MILLISECONDS);
     scheduler.setGracePeriod(grace);
     Assert.assertFalse(scheduler.hasScheduler());
@@ -128,7 +129,7 @@ public class TestTimeoutScheduler extends BaseTest {
 
   @Test(timeout = 1000)
   public void testExtendingGracePeriod() throws Exception {
-    final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(1);
+    final TimeoutScheduler scheduler = TimeoutScheduler.newInstance();
     final TimeDuration grace = TimeDuration.valueOf(100, 
TimeUnit.MILLISECONDS);
     scheduler.setGracePeriod(grace);
     Assert.assertFalse(scheduler.hasScheduler());
@@ -178,7 +179,7 @@ public class TestTimeoutScheduler extends BaseTest {
 
   @Test(timeout = 1000)
   public void testRestartingScheduler() throws Exception {
-    final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(1);
+    final TimeoutScheduler scheduler = TimeoutScheduler.newInstance();
     final TimeDuration grace = TimeDuration.valueOf(100, 
TimeUnit.MILLISECONDS);
     scheduler.setGracePeriod(grace);
     Assert.assertFalse(scheduler.hasScheduler());
@@ -209,9 +210,9 @@ public class TestTimeoutScheduler extends BaseTest {
     errorHandler.assertNoError();
   }
 
-  @Test(timeout = 1000)
+  @Test(timeout = 10_000)
   public void testShutdown() throws Exception {
-    final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(0);
+    final TimeoutScheduler scheduler = TimeoutScheduler.newInstance();
     Assert.assertEquals(TimeoutScheduler.DEFAULT_GRACE_PERIOD, 
scheduler.getGracePeriod());
     final ErrorHandler errorHandler = new ErrorHandler();
 
@@ -222,7 +223,8 @@ public class TestTimeoutScheduler extends BaseTest {
     }
     HUNDRED_MILLIS.sleep();
     HUNDRED_MILLIS.sleep();
-    Assert.assertEquals(1, scheduler.getQueueSize()); // only 1 shutdown task 
is scheduled
+    JavaUtils.attempt(() -> Assert.assertEquals(1, scheduler.getQueueSize()),
+        10, HUNDRED_MILLIS, "only 1 shutdown task is scheduled", LOG);
 
     final TimeDuration oneMillis = TimeDuration.valueOf(1, 
TimeUnit.MILLISECONDS);
     for(int i = 0; i < numTasks; i++) {
@@ -232,7 +234,8 @@ public class TestTimeoutScheduler extends BaseTest {
       oneMillis.sleep();
     }
     HUNDRED_MILLIS.sleep();
-    Assert.assertEquals(1, scheduler.getQueueSize()); // only 1 shutdown task 
is scheduled
+    JavaUtils.attempt(() -> Assert.assertEquals(1, scheduler.getQueueSize()),
+        10, HUNDRED_MILLIS, "only 1 shutdown task is scheduled", LOG);
 
     errorHandler.assertNoError();
   }

Reply via email to