HDDS-109. Add reconnect logic for XceiverClientGrpc.
Contributed by Lokesh Jain.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/43baa036
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/43baa036
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/43baa036

Branch: refs/heads/HADOOP-15407
Commit: 43baa036aeb025bcbed1aca19837b072f2c14a6a
Parents: 8e7548d
Author: Anu Engineer <aengin...@apache.org>
Authored: Wed Jun 13 09:50:10 2018 -0700
Committer: Anu Engineer <aengin...@apache.org>
Committed: Wed Jun 13 09:50:10 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hdds/scm/XceiverClientGrpc.java      | 23 ++++++++++++++++
 .../ozone/scm/TestXceiverClientManager.java     |  6 ++--
 .../hadoop/ozone/web/client/TestKeys.java       | 29 ++++++++++++++++----
 3 files changed, 48 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/43baa036/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index c787024..92df46e 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -55,6 +55,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
   private XceiverClientMetrics metrics;
   private ManagedChannel channel;
   private final Semaphore semaphore;
+  private boolean closed = false;
 
   /**
    * Constructs a client that can communicate with the Container framework on
@@ -105,6 +106,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
 
   @Override
   public void close() {
+    closed = true;
     channel.shutdownNow();
     try {
       channel.awaitTermination(60, TimeUnit.MINUTES);
@@ -153,6 +155,14 @@ public class XceiverClientGrpc extends XceiverClientSpi {
   public CompletableFuture<ContainerCommandResponseProto>
       sendCommandAsync(ContainerCommandRequestProto request)
       throws IOException, ExecutionException, InterruptedException {
+    if(closed){
+      throw new IOException("This channel is not connected.");
+    }
+
+    if(channel == null || !isConnected()) {
+      reconnect();
+    }
+
     final CompletableFuture<ContainerCommandResponseProto> replyFuture =
         new CompletableFuture<>();
     semaphore.acquire();
@@ -192,6 +202,19 @@ public class XceiverClientGrpc extends XceiverClientSpi {
     return replyFuture;
   }
 
+  private void reconnect() throws IOException {
+    try {
+      connect();
+    } catch (Exception e) {
+      LOG.error("Error while connecting: ", e);
+      throw new IOException(e);
+    }
+
+    if (channel == null || !isConnected()) {
+      throw new IOException("This channel is not connected.");
+    }
+  }
+
   /**
    * Create a pipeline.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43baa036/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
index 478cf69..56f3c7a 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
@@ -163,8 +163,7 @@ public class TestXceiverClientManager {
     // and any container operations should fail
     clientManager.releaseClient(client1);
 
-    String expectedMessage = shouldUseGrpc ? "Channel shutdown invoked" :
-        "This channel is not connected.";
+    String expectedMessage = "This channel is not connected.";
     try {
       ContainerProtocolCalls.createContainer(client1,
           container1.getContainerID(), traceID1);
@@ -213,8 +212,7 @@ public class TestXceiverClientManager {
 
     // Any container operation should now fail
     String traceID2 = "trace" + RandomStringUtils.randomNumeric(4);
-    String expectedMessage = shouldUseGrpc ? "Channel shutdown invoked" :
-        "This channel is not connected.";
+    String expectedMessage = "This channel is not connected.";
     try {
       ContainerProtocolCalls.createContainer(client1,
           container1.getContainerID(), traceID2);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43baa036/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java
index 28a138e..b86c577 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java
@@ -29,6 +29,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
@@ -62,12 +63,14 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
-import org.junit.AfterClass;
+import org.junit.After;
 import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.File;
 import java.io.FileInputStream;
@@ -77,6 +80,7 @@ import java.io.IOException;
 import java.net.URISyntaxException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Collection;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -93,6 +97,7 @@ import static org.junit.Assert.fail;
 /**
  * Test Ozone Key Lifecycle.
  */
+@RunWith(Parameterized.class)
 public class TestKeys {
   /**
    * Set the timeout for every test.
@@ -107,19 +112,31 @@ public class TestKeys {
   private static long currentTime;
   private static ReplicationFactor replicationFactor = ReplicationFactor.ONE;
   private static ReplicationType replicationType = ReplicationType.STAND_ALONE;
+  private static boolean shouldUseGrpc;
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> withGrpc() {
+    return Arrays.asList(new Object[][] {{false}, {true}});
+  }
+
+  public TestKeys(boolean useGrpc) {
+    shouldUseGrpc = useGrpc;
+  }
 
   /**
    * Create a MiniDFSCluster for testing.
    *
    * @throws IOException
    */
-  @BeforeClass
-  public static void init() throws Exception {
+  @Before
+  public void init() throws Exception {
     conf = new OzoneConfiguration();
 
     // Set short block deleting service interval to speed up deletions.
     conf.setTimeDuration(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
         1000, TimeUnit.MILLISECONDS);
+    conf.setBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
+        shouldUseGrpc);
 
     path = GenericTestUtils.getTempPath(TestKeys.class.getSimpleName());
     Logger.getLogger("log4j.logger.org.apache.http").setLevel(Level.DEBUG);
@@ -133,8 +150,8 @@ public class TestKeys {
   /**
    * shutdown MiniDFSCluster.
    */
-  @AfterClass
-  public static void shutdown() {
+  @After
+  public void shutdown() {
     if (ozoneCluster != null) {
       ozoneCluster.shutdown();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to