This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 66aadb0  HDDS-6109. Preserve the underlying exception raised in client 
lib. (#2989)
66aadb0 is described below

commit 66aadb02df2470c9e120e0efa92b2c88a101e676
Author: Ritesh H Shukla <[email protected]>
AuthorDate: Thu Jan 20 23:21:11 2022 -0800

    HDDS-6109. Preserve the underlying exception raised in client lib. (#2989)
---
 .../hadoop/hdds/scm/storage/BlockOutputStream.java |   4 +-
 .../rpc/TestContainerStateMachineFailures.java     | 159 ++++++++++++++++++---
 2 files changed, 145 insertions(+), 18 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index 859d808..150c418 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -487,7 +487,7 @@ public class BlockOutputStream extends OutputStream {
       } catch (Throwable e) {
         String msg = "Failed to flush. error: " + e.getMessage();
         LOG.error(msg, e);
-        throw new RuntimeException(msg, e);
+        throw e;
       }
     }
   }
@@ -553,7 +553,7 @@ public class BlockOutputStream extends OutputStream {
       } catch (Throwable e) {
         String msg = "Failed to flush. error: " + e.getMessage();
         LOG.error(msg, e);
-        throw new RuntimeException(msg, e);
+        throw e;
       } finally {
         cleanup(false);
       }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
index 54d9df3..78a73cf 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.client.rpc;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -29,9 +30,9 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
@@ -56,20 +57,23 @@ import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.container.TestHelper;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine;
 import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import 
org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.ozone.test.LambdaTestUtils;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
@@ -94,7 +98,6 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.jupiter.api.BeforeEach;
 
 /**
  * Tests the containerStateMachine failure handling.
@@ -137,8 +140,8 @@ public class TestContainerStateMachineFailures {
 
     RatisClientConfig ratisClientConfig =
         conf.getObject(RatisClientConfig.class);
-    ratisClientConfig.setWriteRequestTimeout(Duration.ofSeconds(10));
-    ratisClientConfig.setWatchRequestTimeout(Duration.ofSeconds(10));
+    ratisClientConfig.setWriteRequestTimeout(Duration.ofSeconds(20));
+    ratisClientConfig.setWatchRequestTimeout(Duration.ofSeconds(20));
     conf.setFromObject(ratisClientConfig);
 
     DatanodeRatisServerConfig ratisServerConfig =
@@ -150,7 +153,7 @@ public class TestContainerStateMachineFailures {
     RatisClientConfig.RaftConfig raftClientConfig =
         conf.getObject(RatisClientConfig.RaftConfig.class);
     raftClientConfig.setRpcRequestTimeout(Duration.ofSeconds(3));
-    raftClientConfig.setRpcWatchRequestTimeout(Duration.ofSeconds(10));
+    raftClientConfig.setRpcWatchRequestTimeout(Duration.ofSeconds(20));
     conf.setFromObject(raftClientConfig);
 
     conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1);
@@ -171,16 +174,6 @@ public class TestContainerStateMachineFailures {
     random = new Random();
   }
 
-  @BeforeEach
-  public void restartDatanode()
-      throws InterruptedException, TimeoutException, AuthenticationException,
-      IOException {
-    for (int i=0; i < cluster.getHddsDatanodes().size(); i++) {
-      cluster.restartHddsDatanode(i, true);
-    }
-    cluster.restartStorageContainerManager(true);
-  }
-
   /**
    * Shutdown MiniDFSCluster.
    */
@@ -679,4 +672,138 @@ public class TestContainerStateMachineFailures {
 
     r2.run();
   }
+
+  @Test
+  public void testContainerStateMachineSingleFailureRetry()
+      throws Exception {
+    OzoneOutputStream key =
+        objectStore.getVolume(volumeName).getBucket(bucketName)
+            .createKey("ratis1", 1024, ReplicationType.RATIS,
+                ReplicationFactor.THREE, new HashMap<>());
+
+    key.write("ratis".getBytes(UTF_8));
+    key.flush();
+    key.write("ratis".getBytes(UTF_8));
+    key.write("ratis".getBytes(UTF_8));
+
+    KeyOutputStream groupOutputStream = (KeyOutputStream) key.
+        getOutputStream();
+    List<OmKeyLocationInfo> locationInfoList =
+        groupOutputStream.getLocationInfoList();
+    Assert.assertEquals(1, locationInfoList.size());
+
+    OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
+
+    induceFollowerFailure(omKeyLocationInfo, 2);
+
+    try {
+      key.flush();
+      key.write("ratis".getBytes(UTF_8));
+      key.flush();
+      key.close();
+    } catch (Exception ioe) {
+      // Should not fail..
+      Assert.fail("Exception " + ioe.getMessage());
+    }
+    validateData("ratis1", 2, "ratisratisratisratis");
+  }
+
+  @Test
+  public void testContainerStateMachineDualFailureRetry()
+      throws Exception {
+    OzoneOutputStream key =
+        objectStore.getVolume(volumeName).getBucket(bucketName)
+            .createKey("ratis2", 1024, ReplicationType.RATIS,
+                ReplicationFactor.THREE, new HashMap<>());
+
+    key.write("ratis".getBytes(UTF_8));
+    key.flush();
+    key.write("ratis".getBytes(UTF_8));
+    key.write("ratis".getBytes(UTF_8));
+
+    KeyOutputStream groupOutputStream = (KeyOutputStream) key.
+        getOutputStream();
+    List<OmKeyLocationInfo> locationInfoList =
+        groupOutputStream.getLocationInfoList();
+    Assert.assertEquals(1, locationInfoList.size());
+
+    OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
+
+    induceFollowerFailure(omKeyLocationInfo, 1);
+
+    try {
+      key.flush();
+      key.write("ratis".getBytes(UTF_8));
+      key.flush();
+      key.close();
+    } catch (Exception ioe) {
+      // Should not fail..
+      Assert.fail("Exception " + ioe.getMessage());
+    }
+    validateData("ratis1", 2, "ratisratisratisratis");
+  }
+
+  private void induceFollowerFailure(OmKeyLocationInfo omKeyLocationInfo,
+                                     int failureCount) {
+    UUID leader = omKeyLocationInfo.getPipeline().getLeaderId();
+    Set<HddsDatanodeService> datanodeSet =
+        TestHelper.getDatanodeServices(cluster,
+            omKeyLocationInfo.getPipeline());
+    int count = 0;
+    for (HddsDatanodeService dn : datanodeSet) {
+      UUID dnUuid = dn.getDatanodeDetails().getUuid();
+      if (!dnUuid.equals(leader)) {
+        count++;
+        long containerID = omKeyLocationInfo.getContainerID();
+        Container container = dn
+            .getDatanodeStateMachine()
+            .getContainer()
+            .getContainerSet()
+            .getContainer(containerID);
+        if (container != null) {
+          ContainerData containerData =
+              container
+                  .getContainerData();
+          Assert.assertTrue(containerData instanceof KeyValueContainerData);
+          KeyValueContainerData keyValueContainerData =
+              (KeyValueContainerData) containerData;
+          FileUtil.fullyDelete(new 
File(keyValueContainerData.getChunksPath()));
+        }
+
+        if (count == failureCount) {
+          break;
+        }
+      }
+    }
+  }
+
+  private void validateData(String key, int locationCount, String payload) {
+    OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(key)
+        .setRefreshPipeline(true)
+        .build();
+    OmKeyInfo keyInfo = null;
+    try {
+      keyInfo = cluster.getOzoneManager().lookupKey(omKeyArgs);
+
+      Assert.assertEquals(locationCount,
+          keyInfo.getLatestVersionLocations().getLocationListCount());
+      OzoneInputStream
+          o = objectStore
+          .getVolume(volumeName)
+          .getBucket(bucketName)
+          .readKey(key);
+      byte[] buffer = new byte[1024];
+      o.read(buffer, 0, 1024);
+      int end = ArrayUtils.indexOf(buffer, (byte) 0);
+      String response = new String(buffer, 0,
+          end,
+          StandardCharsets.UTF_8);
+      Assert.assertEquals(payload, response);
+    } catch (IOException e) {
+      Assert.fail("Exception not expected " + e.getMessage());
+    }
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to