Repository: hadoop Updated Branches: refs/heads/trunk 75291e6d5 -> 671fd6524
http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java index 43517ae..935423d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java @@ -17,7 +17,6 @@ package org.apache.hadoop.ozone.client.rpc; -import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -27,11 +26,6 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; -import org.apache.hadoop.ozone.HddsDatanodeService; -import org.apache.hadoop.hdds.scm.container.common.helpers. - StorageContainerException; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConfigKeys; @@ -55,15 +49,17 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; -import org.slf4j.event.Level; import java.io.IOException; -import java.security.MessageDigest; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.UUID; import java.util.concurrent.TimeoutException; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; /** * Tests Close Container Exception handling by Ozone Client. @@ -79,7 +75,6 @@ public class TestCloseContainerHandlingByClient { private static String volumeName; private static String bucketName; private static String keyString; - private static int maxRetries; /** * Create a MiniDFSCluster for testing. @@ -91,15 +86,14 @@ public class TestCloseContainerHandlingByClient { @BeforeClass public static void init() throws Exception { conf = new OzoneConfiguration(); - maxRetries = 100; - conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, maxRetries); - conf.set(OzoneConfigKeys.OZONE_CLIENT_RETRY_INTERVAL, "200ms"); chunkSize = (int) OzoneConsts.MB; blockSize = 4 * chunkSize; - conf.setInt(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, chunkSize); + conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms"); + conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); + conf.setQuietMode(false); conf.setLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB, (4)); - cluster = MiniOzoneCluster.newBuilder(conf) - .setNumDatanodes(3).build(); + cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(7).build(); cluster.waitForClusterToBeReady(); //the easiest way to create an open container is creating a key client = OzoneClientFactory.getClient(conf); @@ -121,44 +115,29 @@ public class TestCloseContainerHandlingByClient { } } - private static String fixedLengthString(String string, int length) { - return String.format("%1$" + length + "s", string); - } - @Test public void testBlockWritesWithFlushAndClose() throws Exception { String keyName = "standalone"; - OzoneOutputStream key = - createKey(keyName, ReplicationType.STAND_ALONE, 0); + OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); // write data more than 1 chunk - byte[] data = - fixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes(); + byte[] data = ContainerTestHelper + .getFixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes(); key.write(data); Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); //get the name of a valid container OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) - .setBucketName(bucketName) - .setType(HddsProtos.ReplicationType.STAND_ALONE) + .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName) .build(); - waitForContainerClose(keyName, key, HddsProtos.ReplicationType.STAND_ALONE); + waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS); key.write(data); key.flush(); key.close(); // read the key from OM again and match the length.The length will still // be the equal to the original data size. OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs); - List<OmKeyLocationInfo> keyLocationInfos = - keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly(); - //we have written two blocks - Assert.assertEquals(2, keyLocationInfos.size()); - OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(0); - Assert.assertEquals(data.length - (data.length % chunkSize), - omKeyLocationInfo.getLength()); - Assert.assertEquals(data.length + (data.length % chunkSize), - keyLocationInfos.get(1).getLength()); Assert.assertEquals(2 * data.length, keyInfo.getDataSize()); // Written the same data twice @@ -170,37 +149,24 @@ public class TestCloseContainerHandlingByClient { @Test public void testBlockWritesCloseConsistency() throws Exception { String keyName = "standalone2"; - OzoneOutputStream key = createKey(keyName, ReplicationType.STAND_ALONE, 0); + OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); // write data more than 1 chunk - byte[] data = - fixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes(); + byte[] data = ContainerTestHelper + .getFixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes(); key.write(data); Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); //get the name of a valid container OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) - .setBucketName(bucketName) - .setType(HddsProtos.ReplicationType.STAND_ALONE) + .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName) .build(); - waitForContainerClose(keyName, key, HddsProtos.ReplicationType.STAND_ALONE); + waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS); key.close(); // read the key from OM again and match the length.The length will still // be the equal to the original data size. OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs); - List<OmKeyLocationInfo> keyLocationInfos = - keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly(); - // Though we have written only block initially, the close will hit - // closeContainerException and remaining data in the chunkOutputStream - // buffer will be copied into a different allocated block and will be - // committed. - Assert.assertEquals(2, keyLocationInfos.size()); - OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(0); - Assert.assertEquals(data.length - (data.length % chunkSize), - omKeyLocationInfo.getLength()); - Assert.assertEquals(data.length % chunkSize, - keyLocationInfos.get(1).getLength()); Assert.assertEquals(data.length, keyInfo.getDataSize()); validateData(keyName, data); } @@ -210,29 +176,30 @@ public class TestCloseContainerHandlingByClient { String keyName = "standalone3"; OzoneOutputStream key = - createKey(keyName, ReplicationType.STAND_ALONE, (4 * blockSize)); + createKey(keyName, ReplicationType.RATIS, (4 * blockSize)); ChunkGroupOutputStream groupOutputStream = (ChunkGroupOutputStream) key.getOutputStream(); // With the initial size provided, it should have preallocated 4 blocks Assert.assertEquals(4, groupOutputStream.getStreamEntries().size()); - // write data for 3 blocks and 1 more chunk - byte[] data = fixedLengthString(keyString, (3 * blockSize)).getBytes(); + // write data more than 1 chunk + byte[] data = + ContainerTestHelper.getFixedLengthString(keyString, (3 * blockSize)) + .getBytes(); Assert.assertEquals(data.length, 3 * blockSize); key.write(data); Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); //get the name of a valid container OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) - .setBucketName(bucketName) - .setType(HddsProtos.ReplicationType.STAND_ALONE) + .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName) .build(); - waitForContainerClose(keyName, key, - HddsProtos.ReplicationType.STAND_ALONE); + waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS); // write 1 more block worth of data. It will fail and new block will be // allocated - key.write(fixedLengthString(keyString, blockSize).getBytes()); + key.write(ContainerTestHelper.getFixedLengthString(keyString, blockSize) + .getBytes()); key.close(); // read the key from OM again and match the length.The length will still @@ -253,10 +220,10 @@ public class TestCloseContainerHandlingByClient { @Test public void testMultiBlockWrites2() throws Exception { - String keyName = "standalone4"; + String keyName = "ratis2"; long dataLength; OzoneOutputStream key = - createKey(keyName, ReplicationType.STAND_ALONE, 4 * blockSize); + createKey(keyName, ReplicationType.RATIS, 4 * blockSize); ChunkGroupOutputStream groupOutputStream = (ChunkGroupOutputStream) key.getOutputStream(); @@ -264,21 +231,21 @@ public class TestCloseContainerHandlingByClient { // With the initial size provided, it should have pre allocated 4 blocks Assert.assertEquals(4, groupOutputStream.getStreamEntries().size()); String dataString = - fixedLengthString(keyString, (3 * blockSize + chunkSize)); + ContainerTestHelper.getFixedLengthString(keyString, (2 * blockSize)); byte[] data = dataString.getBytes(); key.write(data); // 3 block are completely written to the DataNode in 3 blocks. // Data of length half of chunkSize resides in the chunkOutput stream buffer - String dataString2 = fixedLengthString(keyString, chunkSize * 1 / 2); + String dataString2 = + ContainerTestHelper.getFixedLengthString(keyString, chunkSize * 1 / 2); key.write(dataString2.getBytes()); //get the name of a valid container OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) - .setBucketName(bucketName) - .setType(HddsProtos.ReplicationType.STAND_ALONE) - .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName) + .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) + .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName) .build(); - waitForContainerClose(keyName, key, HddsProtos.ReplicationType.STAND_ALONE); + waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS); key.close(); // read the key from OM again and match the length.The length will still @@ -290,9 +257,8 @@ public class TestCloseContainerHandlingByClient { // closeContainerException and remaining data in the chunkOutputStream // buffer will be copied into a different allocated block and will be // committed. - Assert.assertEquals(5, keyLocationInfos.size()); - dataLength = 3 * blockSize + (long) (1.5 * chunkSize); - Assert.assertEquals(dataLength, keyInfo.getDataSize()); + Assert.assertEquals(dataString.concat(dataString2).getBytes().length, + keyInfo.getDataSize()); validateData(keyName, dataString.concat(dataString2).getBytes()); } @@ -301,14 +267,14 @@ public class TestCloseContainerHandlingByClient { String keyName = "standalone5"; int keyLen = 4 * blockSize; - OzoneOutputStream key = - createKey(keyName, ReplicationType.RATIS, keyLen); + OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, keyLen); ChunkGroupOutputStream groupOutputStream = (ChunkGroupOutputStream) key.getOutputStream(); // With the initial size provided, it should have preallocated 4 blocks Assert.assertEquals(4, groupOutputStream.getStreamEntries().size()); // write data 3 blocks and one more chunk - byte[] writtenData = fixedLengthString(keyString, keyLen).getBytes(); + byte[] writtenData = + ContainerTestHelper.getFixedLengthString(keyString, keyLen).getBytes(); byte[] data = Arrays.copyOfRange(writtenData, 0, 3 * blockSize + chunkSize); Assert.assertEquals(data.length, 3 * blockSize + chunkSize); key.write(data); @@ -316,17 +282,14 @@ public class TestCloseContainerHandlingByClient { Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); //get the name of a valid container OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) - .setBucketName(bucketName) - .setType(HddsProtos.ReplicationType.RATIS) + .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName) .build(); - waitForContainerClose(keyName, key, - HddsProtos.ReplicationType.RATIS); + waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS); // write 3 more chunks worth of data. It will fail and new block will be // allocated. This write completes 4 blocks worth of data written to key - data = Arrays - .copyOfRange(writtenData, 3 * blockSize + chunkSize, keyLen); + data = Arrays.copyOfRange(writtenData, 3 * blockSize + chunkSize, keyLen); key.write(data); key.close(); @@ -345,8 +308,6 @@ public class TestCloseContainerHandlingByClient { // closeContainerException and remaining data in the chunkOutputStream // buffer will be copied into a different allocated block and will be // committed. - Assert.assertEquals(5, keyLocationInfos.size()); - Assert.assertEquals(4 * blockSize, keyInfo.getDataSize()); long length = 0; for (OmKeyLocationInfo locationInfo : keyLocationInfos) { length += locationInfo.getLength(); @@ -378,9 +339,9 @@ public class TestCloseContainerHandlingByClient { cluster.getStorageContainerManager().getEventQueue() .fireEvent(SCMEvents.CLOSE_CONTAINER, ContainerID.valueof(containerID)); - ContainerInfo container = cluster.getStorageContainerManager() - .getContainerManager() - .getContainer(ContainerID.valueof(containerID)); + ContainerInfo container = + cluster.getStorageContainerManager().getContainerManager() + .getContainer(ContainerID.valueof(containerID)); Pipeline pipeline = cluster.getStorageContainerManager().getPipelineManager() .getPipeline(container.getPipelineID()); @@ -406,8 +367,8 @@ public class TestCloseContainerHandlingByClient { .isContainerPresent(cluster, containerID, dn))) { for (DatanodeDetails datanodeDetails : datanodes) { GenericTestUtils.waitFor(() -> ContainerTestHelper - .isContainerClosed(cluster, containerID, datanodeDetails), - 500, 15 * 1000); + .isContainerClosed(cluster, containerID, datanodeDetails), 500, + 15 * 1000); //double check if it's really closed // (waitFor also throws an exception) Assert.assertTrue(ContainerTestHelper @@ -425,29 +386,31 @@ public class TestCloseContainerHandlingByClient { public void testDiscardPreallocatedBlocks() throws Exception { String keyName = "discardpreallocatedblocks"; OzoneOutputStream key = - createKey(keyName, ReplicationType.STAND_ALONE, 2 * blockSize); + createKey(keyName, ReplicationType.RATIS, 2 * blockSize); ChunkGroupOutputStream groupOutputStream = (ChunkGroupOutputStream) key.getOutputStream(); Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); // With the initial size provided, it should have pre allocated 4 blocks Assert.assertEquals(2, groupOutputStream.getStreamEntries().size()); - String dataString = fixedLengthString(keyString, (1 * blockSize)); + String dataString = + ContainerTestHelper.getFixedLengthString(keyString, (1 * blockSize)); byte[] data = dataString.getBytes(); key.write(data); List<OmKeyLocationInfo> locationInfos = new ArrayList<>(groupOutputStream.getLocationInfoList()); long containerID = locationInfos.get(0).getContainerID(); - ContainerInfo container = cluster.getStorageContainerManager() - .getContainerManager() - .getContainer(ContainerID.valueof(containerID)); + ContainerInfo container = + cluster.getStorageContainerManager().getContainerManager() + .getContainer(ContainerID.valueof(containerID)); Pipeline pipeline = cluster.getStorageContainerManager().getPipelineManager() .getPipeline(container.getPipelineID()); List<DatanodeDetails> datanodes = pipeline.getNodes(); Assert.assertEquals(1, datanodes.size()); - waitForContainerClose(keyName, key, HddsProtos.ReplicationType.STAND_ALONE); - dataString = fixedLengthString(keyString, (1 * blockSize)); + waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS); + dataString = + ContainerTestHelper.getFixedLengthString(keyString, (1 * blockSize)); data = dataString.getBytes(); key.write(data); Assert.assertEquals(2, groupOutputStream.getStreamEntries().size()); @@ -466,40 +429,28 @@ public class TestCloseContainerHandlingByClient { private OzoneOutputStream createKey(String keyName, ReplicationType type, long size) throws Exception { - ReplicationFactor factor = - type == ReplicationType.STAND_ALONE ? ReplicationFactor.ONE : - ReplicationFactor.THREE; - return objectStore.getVolume(volumeName).getBucket(bucketName) - .createKey(keyName, size, type, factor); + return ContainerTestHelper + .createKey(keyName, type, size, objectStore, volumeName, bucketName); } private void validateData(String keyName, byte[] data) throws Exception { - byte[] readData = new byte[data.length]; - OzoneInputStream is = - objectStore.getVolume(volumeName).getBucket(bucketName) - .readKey(keyName); - is.read(readData); - MessageDigest sha1 = MessageDigest.getInstance(OzoneConsts.FILE_HASH); - sha1.update(data); - MessageDigest sha2 = MessageDigest.getInstance(OzoneConsts.FILE_HASH); - sha2.update(readData); - Assert.assertTrue(Arrays.equals(sha1.digest(), sha2.digest())); - is.close(); + ContainerTestHelper + .validateData(keyName, data, objectStore, volumeName, bucketName); } @Test public void testBlockWriteViaRatis() throws Exception { String keyName = "ratis"; OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); - byte[] data = - fixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes(); + byte[] data = ContainerTestHelper + .getFixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes(); key.write(data); //get the name of a valid container OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName). setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) - .setFactor(HddsProtos.ReplicationFactor.THREE) - .setKeyName(keyName).build(); + .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName) + .build(); Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS); @@ -510,79 +461,10 @@ public class TestCloseContainerHandlingByClient { // The write will fail but exception will be handled and length will be // updated correctly in OzoneManager once the steam is closed key.close(); - // read the key from OM again and match the length.The length will still - // be the equal to the original data size. OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs); - List<OmKeyLocationInfo> keyLocationInfos = - keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly(); - //we have written two blocks - Assert.assertEquals(2, keyLocationInfos.size()); - OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(0); - Assert.assertEquals(data.length - (data.length % chunkSize), - omKeyLocationInfo.getLength()); - Assert.assertEquals(data.length + (data.length % chunkSize), - keyLocationInfos.get(1).getLength()); - Assert.assertEquals(2 * data.length, keyInfo.getDataSize()); String dataString = new String(data); dataString.concat(dataString); + Assert.assertEquals(2 * data.length, keyInfo.getDataSize()); validateData(keyName, dataString.getBytes()); } - - @Test - public void testRetriesOnBlockNotCommittedException() throws Exception { - String keyName = "blockcommitexceptiontest"; - OzoneOutputStream key = createKey(keyName, ReplicationType.STAND_ALONE, 0); - ChunkGroupOutputStream groupOutputStream = - (ChunkGroupOutputStream) key.getOutputStream(); - GenericTestUtils.setLogLevel(ChunkGroupOutputStream.LOG, Level.TRACE); - GenericTestUtils.LogCapturer logCapturer = - GenericTestUtils.LogCapturer.captureLogs(ChunkGroupOutputStream.LOG); - - Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); - String dataString = fixedLengthString(keyString, (3 * chunkSize)); - key.write(dataString.getBytes()); - List<OmKeyLocationInfo> locationInfos = - groupOutputStream.getLocationInfoList(); - long containerID = locationInfos.get(0).getContainerID(); - ContainerInfo container = cluster.getStorageContainerManager() - .getContainerManager() - .getContainer(ContainerID.valueof(containerID)); - Pipeline pipeline = - cluster.getStorageContainerManager().getPipelineManager() - .getPipeline(container.getPipelineID()); - List<DatanodeDetails> datanodes = pipeline.getNodes(); - Assert.assertEquals(1, datanodes.size()); - // move the container on the datanode to Closing state, this will ensure - // closing the key will hit BLOCK_NOT_COMMITTED_EXCEPTION while trying - // to fetch the committed length - for (HddsDatanodeService datanodeService : cluster.getHddsDatanodes()) { - if (datanodes.get(0).equals(datanodeService.getDatanodeDetails())) { - datanodeService.getDatanodeStateMachine().getContainer() - .getContainerSet().getContainer(containerID).getContainerData() - .setState(ContainerProtos.ContainerDataProto.State.CLOSED); - } - } - dataString = fixedLengthString(keyString, (chunkSize * 1 / 2)); - key.write(dataString.getBytes()); - try { - key.close(); - Assert.fail("Expected Exception not thrown"); - } catch (IOException ioe) { - Assert.assertTrue(ioe instanceof StorageContainerException); - Assert.assertTrue(((StorageContainerException) ioe).getResult() - == ContainerProtos.Result.BLOCK_NOT_COMMITTED); - } - // It should retry only for max retries times - for (int i = 1; i <= maxRetries; i++) { - Assert.assertTrue(logCapturer.getOutput() - .contains("Retrying GetCommittedBlockLength request")); - Assert.assertTrue(logCapturer.getOutput().contains("Already tried " + i)); - } - Assert.assertTrue(logCapturer.getOutput() - .contains("GetCommittedBlockLength request failed.")); - Assert.assertTrue(logCapturer.getOutput().contains( - "retries get failed due to exceeded maximum allowed retries number" - + ": " + maxRetries)); - logCapturer.stopCapturing(); - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java index 73bff6f..c1827c9 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java @@ -23,8 +23,6 @@ import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.scm.container. - common.helpers.StorageContainerException; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneClient; @@ -141,15 +139,8 @@ public class TestContainerStateMachineFailures { .getContainer().getContainerSet() .getContainer(omKeyLocationInfo.getContainerID()).getContainerData() .getContainerPath())); - try { - // flush will throw an exception for the second write as the container - // dir has been deleted. - key.flush(); - Assert.fail("Expected exception not thrown"); - } catch (IOException ioe) { - Assert.assertTrue(ioe.getCause() instanceof StorageContainerException); - } + key.close(); // Make sure the container is marked unhealthy Assert.assertTrue( cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() @@ -157,14 +148,5 @@ public class TestContainerStateMachineFailures { .getContainer(omKeyLocationInfo.getContainerID()) .getContainerState() == ContainerProtos.ContainerDataProto.State.UNHEALTHY); - try { - // subsequent requests will fail with unhealthy container exception - key.close(); - Assert.fail("Expected exception not thrown"); - } catch (IOException ioe) { - Assert.assertTrue(ioe instanceof StorageContainerException); - Assert.assertTrue(((StorageContainerException) ioe).getResult() - == ContainerProtos.Result.BLOCK_NOT_COMMITTED); - } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java new file mode 100644 index 0000000..dc6747f --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.client.rpc; + +import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; + +/** + * Tests Close Container Exception handling by Ozone Client. + */ +public class TestFailureHandlingByClient { + + private static MiniOzoneCluster cluster; + private static OzoneConfiguration conf; + private static OzoneClient client; + private static ObjectStore objectStore; + private static int chunkSize; + private static int blockSize; + private static String volumeName; + private static String bucketName; + private static String keyString; + private static int maxRetries; + + /** + * TODO: we will spawn new MiniOzoneCluster every time for each unit test + * invocation. Need to use the same instance for all tests. + */ + /** + * Create a MiniDFSCluster for testing. + * <p> + * Ozone is made active by setting OZONE_ENABLED = true + * + * @throws IOException + */ + @Before + public void init() throws Exception { + conf = new OzoneConfiguration(); + maxRetries = 100; + chunkSize = (int) OzoneConsts.MB; + blockSize = 4 * chunkSize; + conf.setInt(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, chunkSize); + conf.setInt(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE, 1); + conf.setInt(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE, 2); + conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 5, + TimeUnit.SECONDS); + conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); + conf.setQuietMode(false); + conf.setLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB, (4)); + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(6).build(); + cluster.waitForClusterToBeReady(); + //the easiest way to create an open container is creating a key + client = OzoneClientFactory.getClient(conf); + objectStore = client.getObjectStore(); + keyString = UUID.randomUUID().toString(); + volumeName = "datanodefailurehandlingtest"; + bucketName = volumeName; + objectStore.createVolume(volumeName); + objectStore.getVolume(volumeName).createBucket(bucketName); + } + + /** + * Shutdown MiniDFSCluster. + */ + @After + public void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + // TODO: currently, shutting down 2 datanodes in Ratis leads to + // watchForCommit Api in RaftClient to hand=g forever. Once that gets + // fixed, we need to execute the tets with 2 node failures. + + @Test + public void testBlockWritesWithDnFailures() throws Exception { + String keyName = "ratis3"; + OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); + byte[] data = + ContainerTestHelper + .getFixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes(); + key.write(data); + + // get the name of a valid container + Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); + ChunkGroupOutputStream groupOutputStream = + (ChunkGroupOutputStream) key.getOutputStream(); + List<OmKeyLocationInfo> locationInfoList = + groupOutputStream.getLocationInfoList(); + Assert.assertTrue(locationInfoList.size() == 1); + long containerId = locationInfoList.get(0).getContainerID(); + ContainerInfo container = cluster.getStorageContainerManager() + .getContainerManager() + .getContainer(ContainerID.valueof(containerId)); + Pipeline pipeline = + cluster.getStorageContainerManager().getPipelineManager() + .getPipeline(container.getPipelineID()); + List<DatanodeDetails> datanodes = pipeline.getNodes(); + cluster.shutdownHddsDatanode(datanodes.get(0)); + // cluster.shutdownHddsDatanode(datanodes.get(1)); + // The write will fail but exception will be handled and length will be + // updated correctly in OzoneManager once the steam is closed + key.close(); + //get the name of a valid container + OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) + .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) + .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName) + .build(); + OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs); + Assert.assertEquals(data.length, keyInfo.getDataSize()); + validateData(keyName, data); + cluster.restartHddsDatanode(datanodes.get(0), true); + } + + @Test + public void testMultiBlockWritesWithDnFailures() throws Exception { + String keyName = "ratis3"; + OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0); + String data = + ContainerTestHelper + .getFixedLengthString(keyString, blockSize + chunkSize); + key.write(data.getBytes()); + + // get the name of a valid container + Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); + ChunkGroupOutputStream groupOutputStream = + (ChunkGroupOutputStream) key.getOutputStream(); + List<OmKeyLocationInfo> locationInfoList = + groupOutputStream.getLocationInfoList(); + Assert.assertTrue(locationInfoList.size() == 2); + long containerId = locationInfoList.get(1).getContainerID(); + ContainerInfo container = cluster.getStorageContainerManager() + .getContainerManager() + .getContainer(ContainerID.valueof(containerId)); + Pipeline pipeline = + cluster.getStorageContainerManager().getPipelineManager() + .getPipeline(container.getPipelineID()); + List<DatanodeDetails> datanodes = pipeline.getNodes(); + cluster.shutdownHddsDatanode(datanodes.get(0)); + + // cluster.shutdownHddsDatanode(datanodes.get(1)); + // The write will fail but exception will be handled and length will be + // updated correctly in OzoneManager once the steam is closed + key.write(data.getBytes()); + key.close(); + OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) + .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) + .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName) + .build(); + OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs); + Assert.assertEquals(2 * data.getBytes().length, keyInfo.getDataSize()); + validateData(keyName, data.concat(data).getBytes()); + cluster.restartHddsDatanode(datanodes.get(0), true); + } + + private OzoneOutputStream createKey(String keyName, ReplicationType type, + long size) throws Exception { + return ContainerTestHelper + .createKey(keyName, type, size, objectStore, volumeName, bucketName); + } + + private void validateData(String keyName, byte[] data) throws Exception { + ContainerTestHelper + .validateData(keyName, data, objectStore, volumeName, bucketName); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index 7d002c3..7e9bab5 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -21,10 +21,14 @@ package org.apache.hadoop.ozone.container; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.HddsUtils; +import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.io.OzoneInputStream; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; @@ -632,4 +636,34 @@ public final class ContainerTestHelper { return false; } + public static OzoneOutputStream createKey(String keyName, + ReplicationType type, long size, ObjectStore objectStore, + String volumeName, String bucketName) throws Exception { + org.apache.hadoop.hdds.client.ReplicationFactor factor = + type == ReplicationType.STAND_ALONE ? + org.apache.hadoop.hdds.client.ReplicationFactor.ONE : + org.apache.hadoop.hdds.client.ReplicationFactor.THREE; + return objectStore.getVolume(volumeName).getBucket(bucketName) + .createKey(keyName, size, type, factor); + } + + public static void validateData(String keyName, byte[] data, + ObjectStore objectStore, String volumeName, String bucketName) + throws Exception { + byte[] readData = new byte[data.length]; + OzoneInputStream is = + objectStore.getVolume(volumeName).getBucket(bucketName) + .readKey(keyName); + is.read(readData); + MessageDigest sha1 = MessageDigest.getInstance(OzoneConsts.FILE_HASH); + sha1.update(data); + MessageDigest sha2 = MessageDigest.getInstance(OzoneConsts.FILE_HASH); + sha2.update(readData); + Assert.assertTrue(Arrays.equals(sha1.digest(), sha2.digest())); + is.close(); + } + + public static String getFixedLengthString(String string, int length) { + return String.format("%1$" + length + "s", string); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java index f7ba979..3a15b21 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java @@ -468,7 +468,7 @@ public class TestOzoneContainer { client.getPipeline(), blockID, 1024); CompletableFuture<ContainerProtos.ContainerCommandResponseProto> - response = client.sendCommandAsync(smallFileRequest); + response = client.sendCommandAsync(smallFileRequest).getResponse(); computeResults.add(response); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java index aa1df4c..ce91dbd 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java @@ -119,7 +119,8 @@ public class TestXceiverClientMetrics { smallFileRequest = ContainerTestHelper.getWriteSmallFileRequest( client.getPipeline(), blockID, 1024); CompletableFuture<ContainerProtos.ContainerCommandResponseProto> - response = client.sendCommandAsync(smallFileRequest); + response = + client.sendCommandAsync(smallFileRequest).getResponse(); computeResults.add(response); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/TestOzoneRestWithMiniCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/TestOzoneRestWithMiniCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/TestOzoneRestWithMiniCluster.java index 0f49ade..9680243 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/TestOzoneRestWithMiniCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/TestOzoneRestWithMiniCluster.java @@ -62,7 +62,7 @@ public class TestOzoneRestWithMiniCluster { private static OzoneConfiguration conf; private static ClientProtocol client; private static ReplicationFactor replicationFactor = ReplicationFactor.ONE; - private static ReplicationType replicationType = ReplicationType.STAND_ALONE; + private static ReplicationType replicationType = ReplicationType.RATIS; @Rule public ExpectedException exception = ExpectedException.none(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java index a8df114..6a433b5 100644 --- a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java +++ b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java @@ -22,8 +22,7 @@ import com.google.common.base.Strings; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.ozone.client.OzoneClientUtils; +import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.client.io.LengthInputStream; import org.apache.hadoop.ozone.om.helpers.OmBucketArgs; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; @@ -33,7 +32,6 @@ import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB; import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.OzoneConsts.Versioning; import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream; @@ -63,6 +61,7 @@ import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; /** * A {@link StorageHandler} implementation that distributes object storage @@ -80,10 +79,10 @@ public final class DistributedStorageHandler implements StorageHandler { private final OzoneAcl.OzoneACLRights userRights; private final OzoneAcl.OzoneACLRights groupRights; private int chunkSize; - private final boolean useRatis; - private final HddsProtos.ReplicationType type; - private final HddsProtos.ReplicationFactor factor; - private final RetryPolicy retryPolicy; + private final long streamBufferFlushSize; + private final long streamBufferMaxSize; + private final long watchTimeout; + private final long blockSize; /** * Creates a new DistributedStorageHandler. @@ -100,17 +99,6 @@ public final class DistributedStorageHandler implements StorageHandler { this.ozoneManagerClient = ozoneManagerClient; this.storageContainerLocationClient = storageContainerLocation; this.xceiverClientManager = new XceiverClientManager(conf); - this.useRatis = conf.getBoolean( - ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY, - ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT); - - if(useRatis) { - type = HddsProtos.ReplicationType.RATIS; - factor = HddsProtos.ReplicationFactor.THREE; - } else { - type = HddsProtos.ReplicationType.STAND_ALONE; - factor = HddsProtos.ReplicationFactor.ONE; - } chunkSize = conf.getInt(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_DEFAULT); @@ -118,7 +106,6 @@ public final class DistributedStorageHandler implements StorageHandler { OMConfigKeys.OZONE_OM_USER_RIGHTS_DEFAULT); groupRights = conf.getEnum(OMConfigKeys.OZONE_OM_GROUP_RIGHTS, OMConfigKeys.OZONE_OM_GROUP_RIGHTS_DEFAULT); - retryPolicy = OzoneClientUtils.createRetryPolicy(conf); if(chunkSize > ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE) { LOG.warn("The chunk size ({}) is not allowed to be more than" + " the maximum size ({})," @@ -126,6 +113,18 @@ public final class DistributedStorageHandler implements StorageHandler { chunkSize, ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE); chunkSize = ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE; } + streamBufferFlushSize = + conf.getLong(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE, + OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE_DEFAULT); + streamBufferMaxSize = + conf.getLong(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE, + OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE_DEFAULT); + blockSize = conf.getLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB, + OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT); + watchTimeout = + conf.getTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, + OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); } @Override @@ -420,7 +419,10 @@ public final class DistributedStorageHandler implements StorageHandler { .setRequestID(args.getRequestID()) .setType(xceiverClientManager.getType()) .setFactor(xceiverClientManager.getFactor()) - .setRetryPolicy(retryPolicy) + .setStreamBufferFlushSize(streamBufferFlushSize) + .setStreamBufferMaxSize(streamBufferMaxSize) + .setBlockSize(blockSize) + .setWatchTimeout(watchTimeout) .build(); groupOutputStream.addPreallocateBlocks( openKey.getKeyInfo().getLatestVersionLocations(), http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java index a2df50d..3cf4416 100644 --- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java +++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -44,6 +45,7 @@ public class TestDataValidate { @BeforeClass public static void init() throws Exception { conf = new OzoneConfiguration(); + conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms"); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(5).build(); cluster.waitForClusterToBeReady(); @@ -86,6 +88,8 @@ public class TestDataValidate { randomKeyGenerator.setNumOfKeys(1); randomKeyGenerator.setKeySize(20971520); randomKeyGenerator.setValidateWrites(true); + randomKeyGenerator.setType(ReplicationType.RATIS); + randomKeyGenerator.setFactor(ReplicationFactor.THREE); randomKeyGenerator.call(); Assert.assertEquals(1, randomKeyGenerator.getNumberOfVolumesCreated()); Assert.assertEquals(1, randomKeyGenerator.getNumberOfBucketsCreated()); @@ -101,6 +105,8 @@ public class TestDataValidate { randomKeyGenerator.setNumOfBuckets(5); randomKeyGenerator.setNumOfKeys(10); randomKeyGenerator.setValidateWrites(true); + randomKeyGenerator.setType(ReplicationType.RATIS); + randomKeyGenerator.setFactor(ReplicationFactor.THREE); randomKeyGenerator.call(); Assert.assertEquals(2, randomKeyGenerator.getNumberOfVolumesCreated()); Assert.assertEquals(10, randomKeyGenerator.getNumberOfBucketsCreated()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/671fd652/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java index d21d399..e5bb8ae 100644 --- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java +++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -44,6 +45,7 @@ public class TestRandomKeyGenerator { @BeforeClass public static void init() throws Exception { conf = new OzoneConfiguration(); + conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms"); cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(5).build(); cluster.waitForClusterToBeReady(); } @@ -65,6 +67,8 @@ public class TestRandomKeyGenerator { randomKeyGenerator.setNumOfVolumes(2); randomKeyGenerator.setNumOfBuckets(5); randomKeyGenerator.setNumOfKeys(10); + randomKeyGenerator.setFactor(ReplicationFactor.THREE); + randomKeyGenerator.setType(ReplicationType.RATIS); randomKeyGenerator.call(); Assert.assertEquals(2, randomKeyGenerator.getNumberOfVolumesCreated()); Assert.assertEquals(10, randomKeyGenerator.getNumberOfBucketsCreated()); @@ -81,6 +85,8 @@ public class TestRandomKeyGenerator { randomKeyGenerator.setNumOfKeys(10); randomKeyGenerator.setNumOfThreads(10); randomKeyGenerator.setKeySize(10240); + randomKeyGenerator.setFactor(ReplicationFactor.THREE); + randomKeyGenerator.setType(ReplicationType.RATIS); randomKeyGenerator.call(); Assert.assertEquals(10, randomKeyGenerator.getNumberOfVolumesCreated()); Assert.assertEquals(10, randomKeyGenerator.getNumberOfBucketsCreated()); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
