HDDS-579. ContainerStateMachine should fail subsequent transactions per 
container in case one fails. Contributed by Shashikant Banerjee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a619d120
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a619d120
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a619d120

Branch: refs/heads/ozone-0.3
Commit: a619d120a6c44bde2a846d61505a94f896e58e46
Parents: 0008357
Author: Jitendra Pandey <jiten...@apache.org>
Authored: Sat Oct 13 19:15:01 2018 -0700
Committer: Jitendra Pandey <jiten...@apache.org>
Committed: Mon Oct 15 13:30:53 2018 -0700

----------------------------------------------------------------------
 .../main/proto/DatanodeContainerProtocol.proto  |   4 +-
 .../container/common/impl/HddsDispatcher.java   |  63 +++++--
 .../container/keyvalue/KeyValueHandler.java     |  20 +-
 .../StorageContainerDatanodeProtocol.proto      |   1 +
 .../rpc/TestContainerStateMachineFailures.java  | 185 +++++++++++++++++++
 5 files changed, 242 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a619d120/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto 
b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
index 662df8f..da55db3 100644
--- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
@@ -139,6 +139,7 @@ enum Result {
   CONTAINER_CHECKSUM_ERROR = 33;
   UNKNOWN_CONTAINER_TYPE = 34;
   BLOCK_NOT_COMMITTED = 35;
+  CONTAINER_UNHEALTHY = 36;
 }
 
 /**
@@ -161,7 +162,8 @@ enum ContainerLifeCycleState {
     OPEN = 1;
     CLOSING = 2;
     CLOSED = 3;
-    INVALID = 4;
+    UNHEALTHY = 4;
+    INVALID = 5;
 }
 
 message ContainerCommandRequestProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a619d120/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index bb5002a..1849841 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -142,6 +142,26 @@ public class HddsDispatcher implements ContainerDispatcher 
{
     responseProto = handler.handle(msg, container);
     if (responseProto != null) {
       metrics.incContainerOpsLatencies(cmdType, System.nanoTime() - startTime);
+
+      // If the request is of Write Type and the container operation
+      // is unsuccessful, it implies the applyTransaction on the container
+      // failed. All subsequent transactions on the container should fail and
+      // hence replica will be marked unhealthy here. In this case, a close
+      // container action will be sent to SCM to close the container.
+      if (!HddsUtils.isReadOnly(msg)
+          && responseProto.getResult() != ContainerProtos.Result.SUCCESS) {
+        // If the container is open and the container operation has failed,
+        // it should be first marked unhealthy and the initiate the close
+        // container action. This also implies this is the first transaction
+        // which has failed, so the container is marked unhealthy right here.
+        // Once container is marked unhealthy, all the subsequent write
+        // transactions will fail with UNHEALTHY_CONTAINER exception.
+        if (container.getContainerState() == ContainerLifeCycleState.OPEN) {
+          container.getContainerData()
+              .setState(ContainerLifeCycleState.UNHEALTHY);
+          sendCloseContainerActionIfNeeded(container);
+        }
+      }
       return responseProto;
     } else {
       return ContainerUtils.unsupportedRequest(msg);
@@ -149,31 +169,46 @@ public class HddsDispatcher implements 
ContainerDispatcher {
   }
 
   /**
-   * If the container usage reaches the close threshold we send Close
-   * ContainerAction to SCM.
-   *
+   * If the container usage reaches the close threshold or the container is
+   * marked unhealthy we send Close ContainerAction to SCM.
    * @param container current state of container
    */
   private void sendCloseContainerActionIfNeeded(Container container) {
     // We have to find a more efficient way to close a container.
-    Boolean isOpen = Optional.ofNullable(container)
+    boolean isSpaceFull = isContainerFull(container);
+    boolean shouldClose = isSpaceFull || isContainerUnhealthy(container);
+    if (shouldClose) {
+      ContainerData containerData = container.getContainerData();
+      ContainerAction.Reason reason =
+          isSpaceFull ? ContainerAction.Reason.CONTAINER_FULL :
+              ContainerAction.Reason.CONTAINER_UNHEALTHY;
+      ContainerAction action = ContainerAction.newBuilder()
+          .setContainerID(containerData.getContainerID())
+          .setAction(ContainerAction.Action.CLOSE).setReason(reason).build();
+      context.addContainerActionIfAbsent(action);
+    }
+  }
+
+  private boolean isContainerFull(Container container) {
+    boolean isOpen = Optional.ofNullable(container)
         .map(cont -> cont.getContainerState() == ContainerLifeCycleState.OPEN)
         .orElse(Boolean.FALSE);
     if (isOpen) {
       ContainerData containerData = container.getContainerData();
-      double containerUsedPercentage = 1.0f * containerData.getBytesUsed() /
-          containerData.getMaxSize();
-      if (containerUsedPercentage >= containerCloseThreshold) {
-        ContainerAction action = ContainerAction.newBuilder()
-            .setContainerID(containerData.getContainerID())
-            .setAction(ContainerAction.Action.CLOSE)
-            .setReason(ContainerAction.Reason.CONTAINER_FULL)
-            .build();
-        context.addContainerActionIfAbsent(action);
-      }
+      double containerUsedPercentage =
+          1.0f * containerData.getBytesUsed() / containerData.getMaxSize();
+      return containerUsedPercentage >= containerCloseThreshold;
+    } else {
+      return false;
     }
   }
 
+  private boolean isContainerUnhealthy(Container container) {
+    return Optional.ofNullable(container).map(
+        cont -> (cont.getContainerState() == 
ContainerLifeCycleState.UNHEALTHY))
+        .orElse(Boolean.FALSE);
+  }
+
   @Override
   public Handler getHandler(ContainerProtos.ContainerType containerType) {
     return handlers.get(containerType);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a619d120/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 922db2a..4c87b19 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -79,22 +79,7 @@ import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 import static org.apache.hadoop.hdds.HddsConfigKeys
     .HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.BLOCK_NOT_COMMITTED;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.CLOSED_CONTAINER_IO;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.CONTAINER_INTERNAL_ERROR;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.DELETE_ON_OPEN_CONTAINER;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.GET_SMALL_FILE_ERROR;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.INVALID_CONTAINER_STATE;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.IO_EXCEPTION;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.PUT_SMALL_FILE_ERROR;
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.*;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .Stage;
 import static org.apache.hadoop.ozone.OzoneConfigKeys
@@ -819,6 +804,9 @@ public class KeyValueHandler extends Handler {
       case CLOSED:
         result = CLOSED_CONTAINER_IO;
         break;
+      case UNHEALTHY:
+        result = CONTAINER_UNHEALTHY;
+        break;
       case INVALID:
         result = INVALID_CONTAINER_STATE;
         break;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a619d120/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
 
b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index f8fb32d..72d48a6 100644
--- 
a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ 
b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -159,6 +159,7 @@ message ContainerAction {
 
   enum Reason {
     CONTAINER_FULL = 1;
+    CONTAINER_UNHEALTHY = 2;
   }
 
   required int64 containerID = 1;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a619d120/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
new file mode 100644
index 0000000..0e593fb
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.ContainerAction.Action;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.ContainerAction.Reason;
+import org.apache.hadoop.hdds.scm.container.
+    common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.
+    HDDS_COMMAND_STATUS_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.
+    HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
+    HDDS_SCM_WATCHER_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
+    OZONE_SCM_STALENODE_INTERVAL;
+
+/**
+ * Tests the containerStateMachine failure handling.
+ */
+
+public class TestContainerStateMachineFailures {
+
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf;
+  private static OzoneClient client;
+  private static ObjectStore objectStore;
+  private static String volumeName;
+  private static String bucketName;
+  private static String path;
+  private static int chunkSize;
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   *
+   * @throws IOException
+   */
+  @BeforeClass
+  public static void init() throws Exception {
+    conf = new OzoneConfiguration();
+    path = GenericTestUtils
+        .getTempPath(TestContainerStateMachineFailures.class.getSimpleName());
+    File baseDir = new File(path);
+    baseDir.mkdirs();
+
+    chunkSize = (int) OzoneConsts.MB;
+
+    conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
+        TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200,
+        TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, 
TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+    conf.setQuietMode(false);
+    cluster =
+        MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).setHbInterval(200)
+            .build();
+    cluster.waitForClusterToBeReady();
+    //the easiest way to create an open container is creating a key
+    client = OzoneClientFactory.getClient(conf);
+    objectStore = client.getObjectStore();
+    volumeName = "testcontainerstatemachinefailures";
+    bucketName = volumeName;
+    objectStore.createVolume(volumeName);
+    objectStore.getVolume(volumeName).createBucket(bucketName);
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testContainerStateMachineFailures() throws Exception {
+    OzoneOutputStream key =
+        objectStore.getVolume(volumeName).getBucket(bucketName)
+            .createKey("ratis", 1024, ReplicationType.RATIS,
+                ReplicationFactor.ONE);
+    key.write("ratis".getBytes());
+
+    //get the name of a valid container
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName).
+        setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName("ratis")
+        .build();
+    ChunkGroupOutputStream groupOutputStream =
+        (ChunkGroupOutputStream) key.getOutputStream();
+    List<OmKeyLocationInfo> locationInfoList =
+        groupOutputStream.getLocationInfoList();
+    Assert.assertEquals(1, locationInfoList.size());
+    OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
+
+    long containerID = omKeyLocationInfo.getContainerID();
+    // delete the container dir
+    FileUtil.fullyDelete(new File(
+        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
+            .getContainer().getContainerSet()
+            
.getContainer(omKeyLocationInfo.getContainerID()).getContainerData()
+            .getContainerPath()));
+    try {
+      // flush will throw an exception
+      key.flush();
+      Assert.fail("Expected exception not thrown");
+    } catch (IOException ioe) {
+      Assert.assertTrue(ioe.getCause() instanceof StorageContainerException);
+    }
+
+    // Make sure the container is marked unhealthy
+    Assert.assertTrue(
+        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
+            .getContainer().getContainerSet()
+            .getContainer(omKeyLocationInfo.getContainerID())
+            .getContainerState()
+            == ContainerProtos.ContainerLifeCycleState.UNHEALTHY);
+    try {
+      // subsequent requests will fail with unhealthy container exception
+      key.close();
+      Assert.fail("Expected exception not thrown");
+    } catch (IOException ioe) {
+      Assert.assertTrue(ioe.getCause() instanceof StorageContainerException);
+      Assert.assertTrue(((StorageContainerException) 
ioe.getCause()).getResult()
+          == ContainerProtos.Result.CONTAINER_UNHEALTHY);
+    }
+    StorageContainerDatanodeProtocolProtos.ContainerAction action =
+        StorageContainerDatanodeProtocolProtos.ContainerAction.newBuilder()
+            .setContainerID(containerID).setAction(Action.CLOSE)
+            .setReason(Reason.CONTAINER_UNHEALTHY)
+            .build();
+
+    // Make sure the container close action is initiated to SCM.
+    Assert.assertTrue(
+        
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine().getContext()
+            .getAllPendingContainerActions().contains(action));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to