This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f446aa  RATIS-705. GrpcClientProtocolClient#close Interrupts itself.  
Contributed by Lokesh Jain
3f446aa is described below

commit 3f446aaf27704b0bf929bd39887637a6a71b4418
Author: Tsz Wo Nicholas Sze <szets...@apache.org>
AuthorDate: Fri Oct 11 16:35:38 2019 +0800

    RATIS-705. GrpcClientProtocolClient#close Interrupts itself.  Contributed 
by Lokesh Jain
---
 .../org/apache/ratis/util/TimeoutScheduler.java    |  9 +++--
 .../grpc/client/GrpcClientProtocolClient.java      |  2 +-
 .../org/apache/ratis/RaftAsyncExceptionTests.java  | 39 ++++++++++++++++++++++
 3 files changed, 47 insertions(+), 3 deletions(-)

diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java 
b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
index 172018f..01ccf09 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
@@ -149,6 +149,10 @@ public final class TimeoutScheduler implements Closeable {
 
     final TimeDuration grace = getGracePeriod();
     LOG.debug("Schedule a shutdown task: grace {}, sid {}", grace, sid);
+    if (scheduler == null) {
+      shutdownTask = null;
+      return;
+    }
     final ScheduledFuture<?> future = schedule(scheduler, () -> 
tryShutdownScheduler(sid),
         () -> "shutdown task #" + sid, grace);
     shutdownTask = new ShutdownTask(sid, future);
@@ -161,7 +165,7 @@ public final class TimeoutScheduler implements Closeable {
   }
 
   private synchronized void tryShutdownScheduler(int sid) {
-    if (sid == scheduleID) {
+    if (sid == scheduleID && scheduler != null) {
       // No new tasks submitted, shutdown the scheduler.
       LOG.debug("shutdown scheduler: sid {}", sid);
       scheduler.shutdown();
@@ -176,7 +180,8 @@ public final class TimeoutScheduler implements Closeable {
     onTimeout(timeout, task, t -> log.error(errorMessage.get(), t));
   }
 
-  @Override public synchronized void close() {
+  @Override
+  public synchronized void close() {
     if (scheduler != null) {
       LOG.debug("Closing ThreadPool");
       scheduler.shutdownNow();
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index 45cfeed..6e34433 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -140,13 +140,13 @@ public class GrpcClientProtocolClient implements 
Closeable {
   public void close() {
     
Optional.ofNullable(orderedStreamObservers.getAndSet(null)).ifPresent(AsyncStreamObservers::close);
     
Optional.ofNullable(unorderedStreamObservers.getAndSet(null)).ifPresent(AsyncStreamObservers::close);
-    scheduler.close();
     channel.shutdown();
     try {
       channel.awaitTermination(5, TimeUnit.SECONDS);
     } catch (Exception e) {
       LOG.error("Unexpected exception while waiting for channel termination", 
e);
     }
+    scheduler.close();
   }
 
   RaftClientReplyProto groupAdd(GroupManagementRequestProto request) throws 
IOException {
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java
index f48e989..cdb0e6b 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java
@@ -19,11 +19,15 @@ package org.apache.ratis;
 
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.protocol.AlreadyClosedException;
 import org.apache.ratis.protocol.GroupMismatchException;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.TimeDuration;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -32,11 +36,17 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 public abstract class RaftAsyncExceptionTests<CLUSTER extends MiniRaftCluster>
     extends BaseTest
     implements MiniRaftCluster.Factory.Get<CLUSTER> {
 
+  {
+    getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+        SimpleStateMachine4Testing.class, StateMachine.class);
+  }
+
   @Test
   public void testGroupMismatchException() throws Exception {
     runWithNewCluster(1, this::runTestGroupMismatchException);
@@ -76,4 +86,33 @@ public abstract class RaftAsyncExceptionTests<CLUSTER 
extends MiniRaftCluster>
       }
     }
   }
+
+  @Test
+  public void testTimeoutException() throws Exception {
+    runWithNewCluster(3, this::runTestTimeoutException);
+  }
+
+  private void runTestTimeoutException(CLUSTER cluster) throws Exception {
+    // send a message to make sure the cluster is working
+    try(RaftClient client = cluster.createClient()) {
+      client.send(new SimpleMessage("m0"));
+
+      RaftClientConfigKeys.Rpc.setRequestTimeout(properties.get(),
+          TimeDuration.valueOf(3, TimeUnit.SECONDS));
+      // Block StartTransaction
+      cluster.getServers().stream()
+          .map(cluster::getRaftServerImpl)
+          .map(SimpleStateMachine4Testing::get)
+          .forEach(SimpleStateMachine4Testing::blockStartTransaction);
+      final CompletableFuture<RaftClientReply> replyFuture = 
client.sendAsync(new SimpleMessage("m1"));
+      Thread.sleep(10000);
+      // Unblock StartTransaction
+      cluster.getServers().stream()
+          .map(cluster::getRaftServerImpl)
+          .map(SimpleStateMachine4Testing::get)
+          .forEach(SimpleStateMachine4Testing::unblockStartTransaction);
+      // The request should succeed after start transaction is unblocked
+      Assert.assertTrue(replyFuture.get().isSuccess());
+    }
+  }
 }

Reply via email to