This is an automated email from the ASF dual-hosted git repository.

elek pushed a commit to branch HDDS-2281
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit a4013be81d703e7eee8343b25e1f0e76b2802415
Author: Shashikant Banerjee <shashik...@apache.org>
AuthorDate: Fri Oct 11 01:53:44 2019 +0530

    HDDS-2281. ContainerStateMachine#handleWriteChunk should ignore close 
container exception.
---
 .../server/ratis/ContainerStateMachine.java        |  12 ++-
 .../rpc/TestContainerStateMachineFailures.java     | 108 +++++++++++++++++++++
 .../ozone/container/ContainerTestHelper.java       |  19 +++-
 3 files changed, 137 insertions(+), 2 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index b89ec73..a124448 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -460,6 +460,10 @@ public class ContainerStateMachine extends 
BaseStateMachine {
             LOG.error(gid + ": writeChunk writeStateMachineData failed: 
blockId"
                 + write.getBlockID() + " logIndex " + entryIndex + " chunkName 
"
                 + write.getChunkData().getChunkName() + e);
+            metrics.incNumWriteDataFails();
+            // write chunks go in parallel. It's possible that one write chunk
+            // see the stateMachine is marked unhealthy by other parallel 
thread.
+            stateMachineHealthy.set(false);
             raftFuture.completeExceptionally(e);
             throw e;
           }
@@ -474,7 +478,9 @@ public class ContainerStateMachine extends BaseStateMachine 
{
     // Remove the future once it finishes execution from the
     // writeChunkFutureMap.
     writeChunkFuture.thenApply(r -> {
-      if (r.getResult() != ContainerProtos.Result.SUCCESS) {
+      if (r.getResult() != ContainerProtos.Result.SUCCESS
+          && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
+          && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO) {
         StorageContainerException sce =
             new StorageContainerException(r.getMessage(), r.getResult());
         LOG.error(gid + ": writeChunk writeStateMachineData failed: blockId" +
@@ -482,6 +488,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
             write.getChunkData().getChunkName() + " Error message: " +
             r.getMessage() + " Container Result: " + r.getResult());
         metrics.incNumWriteDataFails();
+        stateMachineHealthy.set(false);
         raftFuture.completeExceptionally(sce);
       } else {
         metrics.incNumBytesWrittenCount(
@@ -584,6 +591,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
       LOG.error("gid {} : ReadStateMachine failed. cmd {} logIndex {} msg : "
               + "{} Container Result: {}", gid, response.getCmdType(), index,
           response.getMessage(), response.getResult());
+      stateMachineHealthy.set(false);
       throw sce;
     }
 
@@ -739,6 +747,8 @@ public class ContainerStateMachine extends BaseStateMachine 
{
               LOG.error("gid {} : ApplyTransaction failed. cmd {} logIndex "
                       + "{} exception {}", gid, requestProto.getCmdType(),
                   index, e);
+              stateMachineHealthy.compareAndSet(true, false);
+              metrics.incNumApplyTransactionsFails();
               applyTransactionFuture.completeExceptionally(e);
               throw e;
             }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
index 9ac45b8..8dde3f7 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
@@ -26,6 +26,8 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -62,7 +64,10 @@ import java.io.IOException;
 import java.nio.file.Path;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.hadoop.hdds.HddsConfigKeys.
     HDDS_COMMAND_STATUS_REPORT_INTERVAL;
@@ -78,6 +83,7 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
     OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 /**
  * Tests the containerStateMachine failure handling.
@@ -418,6 +424,108 @@ public class TestContainerStateMachineFailures {
     Assert.assertFalse(snapshot.getPath().equals(latestSnapshot.getPath()));
   }
 
+  // The test injects multiple write chunk requests along with closed container
+  // request thereby inducing a situation where a writeStateMachine call
+  // gets executed when the closed container apply completes thereby
+  // failing writeStateMachine call. In any case, our stateMachine should
+  // not be marked unhealthy and pipeline should not fail if container gets
+  // closed here.
+  @Test
+  public void testWriteStateMachineDataIdempotencyWithClosedContainer()
+      throws Exception {
+    OzoneOutputStream key =
+        objectStore.getVolume(volumeName).getBucket(bucketName)
+            .createKey("ratis-1", 1024, ReplicationType.RATIS,
+                ReplicationFactor.ONE, new HashMap<>());
+    // First write and flush creates a container in the datanode
+    key.write("ratis".getBytes());
+    key.flush();
+    key.write("ratis".getBytes());
+    KeyOutputStream groupOutputStream = (KeyOutputStream) 
key.getOutputStream();
+    List<OmKeyLocationInfo> locationInfoList =
+        groupOutputStream.getLocationInfoList();
+    Assert.assertEquals(1, locationInfoList.size());
+    OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
+    ContainerData containerData =
+        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
+            .getContainer().getContainerSet()
+            .getContainer(omKeyLocationInfo.getContainerID())
+            .getContainerData();
+    Assert.assertTrue(containerData instanceof KeyValueContainerData);
+    key.close();
+    ContainerStateMachine stateMachine =
+        (ContainerStateMachine) ContainerTestHelper.getStateMachine(cluster);
+    SimpleStateMachineStorage storage =
+        (SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
+    Path parentPath = storage.findLatestSnapshot().getFile().getPath();
+    // Since the snapshot threshold is set to 1, since there are
+    // applyTransactions, we should see snapshots
+    Assert.assertTrue(parentPath.getParent().toFile().listFiles().length > 0);
+    FileInfo snapshot = storage.findLatestSnapshot().getFile();
+    Assert.assertNotNull(snapshot);
+    long containerID = omKeyLocationInfo.getContainerID();
+    Pipeline pipeline = cluster.getStorageContainerLocationClient()
+        .getContainerWithPipeline(containerID).getPipeline();
+    XceiverClientSpi xceiverClient =
+        xceiverClientManager.acquireClient(pipeline);
+    CountDownLatch latch = new CountDownLatch(100);
+    int count = 0;
+    AtomicInteger failCount = new AtomicInteger(0);
+    Runnable r1 = () -> {
+      try {
+        ContainerProtos.ContainerCommandRequestProto.Builder request =
+            ContainerProtos.ContainerCommandRequestProto.newBuilder();
+        request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
+        request.setCmdType(ContainerProtos.Type.CloseContainer);
+        request.setContainerID(containerID);
+        request.setCloseContainer(
+            ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
+        xceiverClient.sendCommand(request.build());
+      } catch (IOException e) {
+        failCount.incrementAndGet();
+      }
+    };
+    Runnable r2 = () -> {
+      try {
+        xceiverClient.sendCommand(ContainerTestHelper
+            .getWriteChunkRequest(pipeline, omKeyLocationInfo.getBlockID(),
+                1024, new Random().nextInt()));
+        latch.countDown();
+      } catch (IOException e) {
+        latch.countDown();
+        if (!(HddsClientUtils
+            .checkForException(e) instanceof ContainerNotOpenException)) {
+          failCount.incrementAndGet();
+        }
+      }
+    };
+
+    for (int i=0 ; i < 100; i++) {
+      count++;
+      new Thread(r2).start();
+    }
+
+    new Thread(r1).start();
+    latch.await(600, TimeUnit.SECONDS);
+    if (failCount.get() > 0) {
+      fail("testWriteStateMachineDataIdempotencyWithClosedContainer failed");
+    }
+    Assert.assertTrue(
+        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
+            .getContainer().getContainerSet().getContainer(containerID)
+            .getContainerState()
+            == ContainerProtos.ContainerDataProto.State.CLOSED);
+    Assert.assertTrue(stateMachine.isStateMachineHealthy());
+    try {
+      stateMachine.takeSnapshot();
+    } catch (IOException ioe) {
+      Assert.fail("Exception should not be thrown");
+    }
+    FileInfo latestSnapshot = storage.findLatestSnapshot().getFile();
+    Assert.assertFalse(snapshot.getPath().equals(latestSnapshot.getPath()));
+
+  }
+
   @Test
   public void testValidateBCSIDOnDnRestart() throws Exception {
     OzoneOutputStream key =
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
index 395bda0..0b8dffc 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
@@ -227,6 +227,23 @@ public final class ContainerTestHelper {
       Pipeline pipeline, BlockID blockID, int datalen) throws IOException {
     LOG.trace("writeChunk {} (blockID={}) to pipeline=",
         datalen, blockID, pipeline);
+    return getWriteChunkRequest(pipeline, blockID, datalen, 0);
+  }
+
+  /**
+   * Returns a writeChunk Request.
+   *
+   * @param pipeline - A set of machines where this container lives.
+   * @param blockID - Block ID of the chunk.
+   * @param datalen - Length of data.
+   * @return ContainerCommandRequestProto
+   * @throws IOException
+   * @throws NoSuchAlgorithmException
+   */
+  public static ContainerCommandRequestProto getWriteChunkRequest(
+      Pipeline pipeline, BlockID blockID, int datalen, int seq) throws 
IOException {
+    LOG.trace("writeChunk {} (blockID={}) to pipeline=",
+        datalen, blockID, pipeline);
     ContainerProtos.WriteChunkRequestProto.Builder writeRequest =
         ContainerProtos.WriteChunkRequestProto
             .newBuilder();
@@ -234,7 +251,7 @@ public final class ContainerTestHelper {
     writeRequest.setBlockID(blockID.getDatanodeBlockIDProtobuf());
 
     ByteBuffer data = getData(datalen);
-    ChunkInfo info = getChunk(blockID.getLocalID(), 0, 0, datalen);
+    ChunkInfo info = getChunk(blockID.getLocalID(), seq, 0, datalen);
     setDataChecksum(info, data);
 
     writeRequest.setChunkData(info.getProtoBufMessage());


---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-commits-h...@hadoop.apache.org

Reply via email to