This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 66aadb0 HDDS-6109. Preserve the underlying exception raised in client
lib. (#2989)
66aadb0 is described below
commit 66aadb02df2470c9e120e0efa92b2c88a101e676
Author: Ritesh H Shukla <[email protected]>
AuthorDate: Thu Jan 20 23:21:11 2022 -0800
HDDS-6109. Preserve the underlying exception raised in client lib. (#2989)
---
.../hadoop/hdds/scm/storage/BlockOutputStream.java | 4 +-
.../rpc/TestContainerStateMachineFailures.java | 159 ++++++++++++++++++---
2 files changed, 145 insertions(+), 18 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index 859d808..150c418 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -487,7 +487,7 @@ public class BlockOutputStream extends OutputStream {
} catch (Throwable e) {
String msg = "Failed to flush. error: " + e.getMessage();
LOG.error(msg, e);
- throw new RuntimeException(msg, e);
+ throw e;
}
}
}
@@ -553,7 +553,7 @@ public class BlockOutputStream extends OutputStream {
} catch (Throwable e) {
String msg = "Failed to flush. error: " + e.getMessage();
LOG.error(msg, e);
- throw new RuntimeException(msg, e);
+ throw e;
} finally {
cleanup(false);
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
index 54d9df3..78a73cf 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.client.rpc;
import java.io.File;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
@@ -29,9 +30,9 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.client.ReplicationFactor;
@@ -56,20 +57,23 @@ import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.TestHelper;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
import
org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine;
import
org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import
org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.ozone.test.LambdaTestUtils;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -94,7 +98,6 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.junit.jupiter.api.BeforeEach;
/**
* Tests the containerStateMachine failure handling.
@@ -137,8 +140,8 @@ public class TestContainerStateMachineFailures {
RatisClientConfig ratisClientConfig =
conf.getObject(RatisClientConfig.class);
- ratisClientConfig.setWriteRequestTimeout(Duration.ofSeconds(10));
- ratisClientConfig.setWatchRequestTimeout(Duration.ofSeconds(10));
+ ratisClientConfig.setWriteRequestTimeout(Duration.ofSeconds(20));
+ ratisClientConfig.setWatchRequestTimeout(Duration.ofSeconds(20));
conf.setFromObject(ratisClientConfig);
DatanodeRatisServerConfig ratisServerConfig =
@@ -150,7 +153,7 @@ public class TestContainerStateMachineFailures {
RatisClientConfig.RaftConfig raftClientConfig =
conf.getObject(RatisClientConfig.RaftConfig.class);
raftClientConfig.setRpcRequestTimeout(Duration.ofSeconds(3));
- raftClientConfig.setRpcWatchRequestTimeout(Duration.ofSeconds(10));
+ raftClientConfig.setRpcWatchRequestTimeout(Duration.ofSeconds(20));
conf.setFromObject(raftClientConfig);
conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1);
@@ -171,16 +174,6 @@ public class TestContainerStateMachineFailures {
random = new Random();
}
- @BeforeEach
- public void restartDatanode()
- throws InterruptedException, TimeoutException, AuthenticationException,
- IOException {
- for (int i=0; i < cluster.getHddsDatanodes().size(); i++) {
- cluster.restartHddsDatanode(i, true);
- }
- cluster.restartStorageContainerManager(true);
- }
-
/**
* Shutdown MiniDFSCluster.
*/
@@ -679,4 +672,138 @@ public class TestContainerStateMachineFailures {
r2.run();
}
+
+ @Test
+ public void testContainerStateMachineSingleFailureRetry()
+ throws Exception {
+ OzoneOutputStream key =
+ objectStore.getVolume(volumeName).getBucket(bucketName)
+ .createKey("ratis1", 1024, ReplicationType.RATIS,
+ ReplicationFactor.THREE, new HashMap<>());
+
+ key.write("ratis".getBytes(UTF_8));
+ key.flush();
+ key.write("ratis".getBytes(UTF_8));
+ key.write("ratis".getBytes(UTF_8));
+
+ KeyOutputStream groupOutputStream = (KeyOutputStream) key.
+ getOutputStream();
+ List<OmKeyLocationInfo> locationInfoList =
+ groupOutputStream.getLocationInfoList();
+ Assert.assertEquals(1, locationInfoList.size());
+
+ OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
+
+ induceFollowerFailure(omKeyLocationInfo, 2);
+
+ try {
+ key.flush();
+ key.write("ratis".getBytes(UTF_8));
+ key.flush();
+ key.close();
+ } catch (Exception ioe) {
+ // Should not fail..
+ Assert.fail("Exception " + ioe.getMessage());
+ }
+ validateData("ratis1", 2, "ratisratisratisratis");
+ }
+
+ @Test
+ public void testContainerStateMachineDualFailureRetry()
+ throws Exception {
+ OzoneOutputStream key =
+ objectStore.getVolume(volumeName).getBucket(bucketName)
+ .createKey("ratis2", 1024, ReplicationType.RATIS,
+ ReplicationFactor.THREE, new HashMap<>());
+
+ key.write("ratis".getBytes(UTF_8));
+ key.flush();
+ key.write("ratis".getBytes(UTF_8));
+ key.write("ratis".getBytes(UTF_8));
+
+ KeyOutputStream groupOutputStream = (KeyOutputStream) key.
+ getOutputStream();
+ List<OmKeyLocationInfo> locationInfoList =
+ groupOutputStream.getLocationInfoList();
+ Assert.assertEquals(1, locationInfoList.size());
+
+ OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
+
+ induceFollowerFailure(omKeyLocationInfo, 1);
+
+ try {
+ key.flush();
+ key.write("ratis".getBytes(UTF_8));
+ key.flush();
+ key.close();
+ } catch (Exception ioe) {
+ // Should not fail..
+ Assert.fail("Exception " + ioe.getMessage());
+ }
+ validateData("ratis1", 2, "ratisratisratisratis");
+ }
+
+ private void induceFollowerFailure(OmKeyLocationInfo omKeyLocationInfo,
+ int failureCount) {
+ UUID leader = omKeyLocationInfo.getPipeline().getLeaderId();
+ Set<HddsDatanodeService> datanodeSet =
+ TestHelper.getDatanodeServices(cluster,
+ omKeyLocationInfo.getPipeline());
+ int count = 0;
+ for (HddsDatanodeService dn : datanodeSet) {
+ UUID dnUuid = dn.getDatanodeDetails().getUuid();
+ if (!dnUuid.equals(leader)) {
+ count++;
+ long containerID = omKeyLocationInfo.getContainerID();
+ Container container = dn
+ .getDatanodeStateMachine()
+ .getContainer()
+ .getContainerSet()
+ .getContainer(containerID);
+ if (container != null) {
+ ContainerData containerData =
+ container
+ .getContainerData();
+ Assert.assertTrue(containerData instanceof KeyValueContainerData);
+ KeyValueContainerData keyValueContainerData =
+ (KeyValueContainerData) containerData;
+ FileUtil.fullyDelete(new
File(keyValueContainerData.getChunksPath()));
+ }
+
+ if (count == failureCount) {
+ break;
+ }
+ }
+ }
+ }
+
+ private void validateData(String key, int locationCount, String payload) {
+ OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
+ .setVolumeName(volumeName)
+ .setBucketName(bucketName)
+ .setKeyName(key)
+ .setRefreshPipeline(true)
+ .build();
+ OmKeyInfo keyInfo = null;
+ try {
+ keyInfo = cluster.getOzoneManager().lookupKey(omKeyArgs);
+
+ Assert.assertEquals(locationCount,
+ keyInfo.getLatestVersionLocations().getLocationListCount());
+ OzoneInputStream
+ o = objectStore
+ .getVolume(volumeName)
+ .getBucket(bucketName)
+ .readKey(key);
+ byte[] buffer = new byte[1024];
+ o.read(buffer, 0, 1024);
+ int end = ArrayUtils.indexOf(buffer, (byte) 0);
+ String response = new String(buffer, 0,
+ end,
+ StandardCharsets.UTF_8);
+ Assert.assertEquals(payload, response);
+ } catch (IOException e) {
+ Assert.fail("Exception not expected " + e.getMessage());
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]