Repository: hadoop
Updated Branches:
  refs/heads/ozone-0.3 732df81a6 -> db90350c9


HDDS-797. If DN is started before SCM, it does not register. Contributed by 
Hanisha Koneru.

(cherry picked from commit c8ca1747c08d905cdefaa5566dd58d770a6b71bd)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/db90350c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/db90350c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/db90350c

Branch: refs/heads/ozone-0.3
Commit: db90350c97cfe1f3cffb2f1e6df53e353e1c25af
Parents: 732df81
Author: Arpit Agarwal <a...@apache.org>
Authored: Mon Nov 5 09:40:00 2018 -0800
Committer: Arpit Agarwal <a...@apache.org>
Committed: Mon Nov 5 10:07:43 2018 -0800

----------------------------------------------------------------------
 .../states/endpoint/VersionEndpointTask.java    | 79 +++++++++++---------
 .../hadoop/ozone/TestMiniOzoneCluster.java      | 52 ++++++++++++-
 2 files changed, 94 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/db90350c/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
index 79fa174..2d00da8 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
@@ -64,50 +64,57 @@ public class VersionEndpointTask implements
   public EndpointStateMachine.EndPointStates call() throws Exception {
     rpcEndPoint.lock();
     try{
-      SCMVersionResponseProto versionResponse =
-          rpcEndPoint.getEndPoint().getVersion(null);
-      VersionResponse response = VersionResponse.getFromProtobuf(
-          versionResponse);
-      rpcEndPoint.setVersion(response);
+      if (rpcEndPoint.getState().equals(
+          EndpointStateMachine.EndPointStates.GETVERSION)) {
+        SCMVersionResponseProto versionResponse =
+            rpcEndPoint.getEndPoint().getVersion(null);
+        VersionResponse response = VersionResponse.getFromProtobuf(
+            versionResponse);
+        rpcEndPoint.setVersion(response);
 
-      String scmId = response.getValue(OzoneConsts.SCM_ID);
-      String clusterId = response.getValue(OzoneConsts.CLUSTER_ID);
+        String scmId = response.getValue(OzoneConsts.SCM_ID);
+        String clusterId = response.getValue(OzoneConsts.CLUSTER_ID);
 
-      // Check volumes
-      VolumeSet volumeSet = ozoneContainer.getVolumeSet();
-      volumeSet.writeLock();
-      try {
-        Map<String, HddsVolume> volumeMap = volumeSet.getVolumeMap();
+        // Check volumes
+        VolumeSet volumeSet = ozoneContainer.getVolumeSet();
+        volumeSet.writeLock();
+        try {
+          Map<String, HddsVolume> volumeMap = volumeSet.getVolumeMap();
 
-        Preconditions.checkNotNull(scmId, "Reply from SCM: scmId cannot be " +
-            "null");
-        Preconditions.checkNotNull(clusterId, "Reply from SCM: clusterId " +
-            "cannot be null");
+          Preconditions.checkNotNull(scmId, "Reply from SCM: scmId cannot be " 
+
+              "null");
+          Preconditions.checkNotNull(clusterId, "Reply from SCM: clusterId " +
+              "cannot be null");
 
-        // If version file does not exist create version file and also set 
scmId
-        for (Map.Entry<String, HddsVolume> entry : volumeMap.entrySet()) {
-          HddsVolume hddsVolume = entry.getValue();
-          boolean result = HddsVolumeUtil.checkVolume(hddsVolume, scmId,
-              clusterId, LOG);
-          if (!result) {
-            volumeSet.failVolume(hddsVolume.getHddsRootDir().getPath());
+          // If version file does not exist create version file and also set 
scmId
+
+          for (Map.Entry<String, HddsVolume> entry : volumeMap.entrySet()) {
+            HddsVolume hddsVolume = entry.getValue();
+            boolean result = HddsVolumeUtil.checkVolume(hddsVolume, scmId,
+                clusterId, LOG);
+            if (!result) {
+              volumeSet.failVolume(hddsVolume.getHddsRootDir().getPath());
+            }
           }
+          if (volumeSet.getVolumesList().size() == 0) {
+            // All volumes are in inconsistent state
+            throw new DiskOutOfSpaceException("All configured Volumes are in " 
+
+                "Inconsistent State");
+          }
+        } finally {
+          volumeSet.writeUnlock();
         }
-        if (volumeSet.getVolumesList().size() == 0) {
-          // All volumes are in inconsistent state
-          throw new DiskOutOfSpaceException("All configured Volumes are in " +
-              "Inconsistent State");
-        }
-      } finally {
-        volumeSet.writeUnlock();
-      }
 
-      ozoneContainer.getDispatcher().setScmId(scmId);
+        ozoneContainer.getDispatcher().setScmId(scmId);
 
-      EndpointStateMachine.EndPointStates nextState =
-          rpcEndPoint.getState().getNextState();
-      rpcEndPoint.setState(nextState);
-      rpcEndPoint.zeroMissedCount();
+        EndpointStateMachine.EndPointStates nextState =
+            rpcEndPoint.getState().getNextState();
+        rpcEndPoint.setState(nextState);
+        rpcEndPoint.zeroMissedCount();
+      } else {
+        LOG.debug("Cannot execute GetVersion task as endpoint state machine " +
+            "is in {} state", rpcEndPoint.getState());
+      }
     } catch (DiskOutOfSpaceException ex) {
       rpcEndPoint.setState(EndpointStateMachine.EndPointStates.SHUTDOWN);
     } catch(IOException ex) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db90350c/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
index bf6a189..7f25e4e 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
@@ -22,9 +22,13 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
-import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .DatanodeStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .EndpointStateMachine;
 import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.TestUtils;
@@ -212,4 +216,50 @@ public class TestMiniOzoneCluster {
     out.write("malformed".getBytes());
     out.close();
   }
+
+  /**
+   * Test that a DN can register with SCM even if it was started before the 
SCM.
+   * @throws Exception
+   */
+  @Test (timeout = 300_000)
+  public void testDNstartAfterSCM() throws Exception {
+    // Start a cluster with 1 DN
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(1)
+        .build();
+    cluster.waitForClusterToBeReady();
+
+    // Stop the SCM
+    StorageContainerManager scm = cluster.getStorageContainerManager();
+    scm.stop();
+
+    // Restart DN
+    cluster.restartHddsDatanode(0, false);
+
+    // DN should be in GETVERSION state till the SCM is restarted.
+    // Check DN endpoint state for 20 seconds
+    DatanodeStateMachine dnStateMachine = cluster.getHddsDatanodes().get(0)
+        .getDatanodeStateMachine();
+    for (int i = 0; i < 20; i++) {
+      for (EndpointStateMachine endpoint :
+          dnStateMachine.getConnectionManager().getValues()) {
+        Assert.assertEquals(
+            EndpointStateMachine.EndPointStates.GETVERSION,
+            endpoint.getState());
+      }
+      Thread.sleep(1000);
+    }
+
+    // DN should successfully register with the SCM after SCM is restarted.
+    // Restart the SCM
+    cluster.restartStorageContainerManager();
+    // Wait for DN to register
+    cluster.waitForClusterToBeReady();
+    // DN should be in HEARTBEAT state after registering with the SCM
+    for (EndpointStateMachine endpoint :
+        dnStateMachine.getConnectionManager().getValues()) {
+      Assert.assertEquals(EndpointStateMachine.EndPointStates.HEARTBEAT,
+          endpoint.getState());
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to