HDDS-339. Add block length and blockId in PutKeyResponse. Contributed by Shashikant Banerjee.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/398d8955 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/398d8955 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/398d8955 Branch: refs/heads/HADOOP-15407 Commit: 398d89554398a38ffa1347524286cd437f94f3ae Parents: 15241c6 Author: Mukul Kumar Singh <msi...@apache.org> Authored: Fri Aug 10 23:45:56 2018 +0530 Committer: Mukul Kumar Singh <msi...@apache.org> Committed: Fri Aug 10 23:45:56 2018 +0530 ---------------------------------------------------------------------- .../main/proto/DatanodeContainerProtocol.proto | 1 + .../container/keyvalue/KeyValueHandler.java | 18 +- .../container/keyvalue/helpers/KeyUtils.java | 50 +++- .../container/keyvalue/impl/KeyManagerImpl.java | 4 +- .../keyvalue/interfaces/KeyManager.java | 3 +- .../ozone/scm/TestCommittedBlockLengthAPI.java | 216 ---------------- .../TestGetCommittedBlockLengthAndPutKey.java | 254 +++++++++++++++++++ 7 files changed, 313 insertions(+), 233 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/398d8955/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto index af06346..930f314 100644 --- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto @@ -308,6 +308,7 @@ message PutKeyRequestProto { } message PutKeyResponseProto { + required GetCommittedBlockLengthResponseProto committedBlockLength = 1; } message GetKeyRequestProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/398d8955/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index f4699dd..8364a77 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -421,6 +421,7 @@ public class KeyValueHandler extends Handler { ContainerCommandResponseProto handlePutKey( ContainerCommandRequestProto request, KeyValueContainer kvContainer) { + long blockLength; if (!request.hasPutKey()) { LOG.debug("Malformed Put Key request. trace ID: {}", request.getTraceID()); @@ -433,7 +434,7 @@ public class KeyValueHandler extends Handler { KeyData keyData = KeyData.getFromProtoBuf( request.getPutKey().getKeyData()); long numBytes = keyData.getProtoBufMessage().toByteArray().length; - commitKey(keyData, kvContainer); + blockLength = commitKey(keyData, kvContainer); metrics.incContainerBytesStats(Type.PutKey, numBytes); } catch (StorageContainerException ex) { return ContainerUtils.logAndReturnError(LOG, ex, request); @@ -443,7 +444,7 @@ public class KeyValueHandler extends Handler { request); } - return KeyUtils.getKeyResponseSuccess(request); + return KeyUtils.putKeyResponseSuccess(request, blockLength); } private void commitPendingKeys(KeyValueContainer kvContainer) @@ -456,12 +457,13 @@ public class KeyValueHandler extends Handler { } } - private void commitKey(KeyData keyData, KeyValueContainer kvContainer) + private long commitKey(KeyData keyData, KeyValueContainer kvContainer) throws IOException { Preconditions.checkNotNull(keyData); - keyManager.putKey(kvContainer, keyData); + long length = keyManager.putKey(kvContainer, keyData); //update the open key Map in containerManager this.openContainerBlockMap.removeFromKeyMap(keyData.getBlockID()); + return length; } /** * Handle Get Key operation. Calls KeyManager to process the request. @@ -662,8 +664,12 @@ public class KeyValueHandler extends Handler { request.getWriteChunk().getStage() == Stage.COMBINED) { metrics.incContainerBytesStats(Type.WriteChunk, request.getWriteChunk() .getChunkData().getLen()); - // the openContainerBlockMap should be updated only while writing data - // not during COMMIT_STAGE of handling write chunk request. + } + + if (request.getWriteChunk().getStage() == Stage.COMMIT_DATA + || request.getWriteChunk().getStage() == Stage.COMBINED) { + // the openContainerBlockMap should be updated only during + // COMMIT_STAGE of handling write chunk request. openContainerBlockMap.addChunk(blockID, chunkInfoProto); } } catch (StorageContainerException ex) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/398d8955/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java index 2be966d..a83d298 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java @@ -27,6 +27,10 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .GetKeyResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos. + GetCommittedBlockLengthResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos. + PutKeyResponseProto; import org.apache.hadoop.hdds.scm.container.common.helpers .StorageContainerException; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; @@ -123,6 +127,26 @@ public final class KeyUtils { } /** + * Returns putKey response success. + * @param msg - Request. + * @return Response. + */ + public static ContainerCommandResponseProto putKeyResponseSuccess( + ContainerCommandRequestProto msg, long blockLength) { + GetCommittedBlockLengthResponseProto.Builder + committedBlockLengthResponseBuilder = + getCommittedBlockLengthResponseBuilder(blockLength, + msg.getPutKey().getKeyData().getBlockID()); + PutKeyResponseProto.Builder putKeyResponse = + PutKeyResponseProto.newBuilder(); + putKeyResponse + .setCommittedBlockLength(committedBlockLengthResponseBuilder); + ContainerProtos.ContainerCommandResponseProto.Builder builder = + ContainerUtils.getSuccessResponseBuilder(msg); + builder.setPutKey(putKeyResponse); + return builder.build(); + } + /** * Returns successful keyResponse. * @param msg - Request. * @return Response. @@ -150,18 +174,26 @@ public final class KeyUtils { * @param msg - Request. * @return Response. */ - public static ContainerProtos.ContainerCommandResponseProto - getBlockLengthResponse(ContainerProtos. - ContainerCommandRequestProto msg, long blockLength) { + public static ContainerCommandResponseProto getBlockLengthResponse( + ContainerCommandRequestProto msg, long blockLength) { + GetCommittedBlockLengthResponseProto.Builder + committedBlockLengthResponseBuilder = + getCommittedBlockLengthResponseBuilder(blockLength, + msg.getGetCommittedBlockLength().getBlockID()); + ContainerProtos.ContainerCommandResponseProto.Builder builder = + ContainerUtils.getSuccessResponseBuilder(msg); + builder.setGetCommittedBlockLength(committedBlockLengthResponseBuilder); + return builder.build(); + } + + private static GetCommittedBlockLengthResponseProto.Builder + getCommittedBlockLengthResponseBuilder( + long blockLength, ContainerProtos.DatanodeBlockID blockID) { ContainerProtos.GetCommittedBlockLengthResponseProto.Builder getCommittedBlockLengthResponseBuilder = ContainerProtos. GetCommittedBlockLengthResponseProto.newBuilder(); getCommittedBlockLengthResponseBuilder.setBlockLength(blockLength); - getCommittedBlockLengthResponseBuilder - .setBlockID(msg.getGetCommittedBlockLength().getBlockID()); - ContainerProtos.ContainerCommandResponseProto.Builder builder = - ContainerUtils.getSuccessResponseBuilder(msg); - builder.setGetCommittedBlockLength(getCommittedBlockLengthResponseBuilder); - return builder.build(); + getCommittedBlockLengthResponseBuilder.setBlockID(blockID); + return getCommittedBlockLengthResponseBuilder; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/398d8955/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyManagerImpl.java index 58bf1f8..6370f8e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyManagerImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyManagerImpl.java @@ -67,9 +67,10 @@ public class KeyManagerImpl implements KeyManager { * * @param container - Container for which key need to be added. * @param data - Key Data. + * @return length of the key. * @throws IOException */ - public void putKey(Container container, KeyData data) throws IOException { + public long putKey(Container container, KeyData data) throws IOException { Preconditions.checkNotNull(data, "KeyData cannot be null for put " + "operation."); Preconditions.checkState(data.getContainerID() >= 0, "Container Id " + @@ -87,6 +88,7 @@ public class KeyManagerImpl implements KeyManager { // Increment keycount here container.getContainerData().incrKeyCount(); + return data.getSize(); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/398d8955/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java index dad688e..37871be 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java @@ -35,9 +35,10 @@ public interface KeyManager { * * @param container - Container for which key need to be added. * @param data - Key Data. + * @return length of the Key. * @throws IOException */ - void putKey(Container container, KeyData data) throws IOException; + long putKey(Container container, KeyData data) throws IOException; /** * Gets an existing key. http://git-wip-us.apache.org/repos/asf/hadoop/blob/398d8955/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestCommittedBlockLengthAPI.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestCommittedBlockLengthAPI.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestCommittedBlockLengthAPI.java deleted file mode 100644 index 3c6479f..0000000 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestCommittedBlockLengthAPI.java +++ /dev/null @@ -1,216 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.scm; - -import org.apache.commons.lang3.RandomStringUtils; -import org.apache.commons.lang3.RandomUtils; -import org.apache.hadoop.hdds.client.BlockID; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdds.scm.XceiverClientManager; -import org.apache.hadoop.hdds.scm.XceiverClientSpi; -import org.apache.hadoop.hdds.scm.container.common.helpers. - ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers. - StorageContainerException; -import org.apache.hadoop.hdds.scm.container.placement.algorithms. - ContainerPlacementPolicy; -import org.apache.hadoop.hdds.scm.container.placement.algorithms. - SCMContainerPlacementCapacity; -import org.apache.hadoop.hdds.scm.protocolPB. - StorageContainerLocationProtocolClientSideTranslatorPB; -import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.ozone.container.ContainerTestHelper; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - - -import java.util.UUID; - -/** - * Test Container calls. - */ -public class TestCommittedBlockLengthAPI { - - private static MiniOzoneCluster cluster; - private static OzoneConfiguration ozoneConfig; - private static StorageContainerLocationProtocolClientSideTranslatorPB - storageContainerLocationClient; - private static XceiverClientManager xceiverClientManager; - private static String containerOwner = "OZONE"; - - @BeforeClass - public static void init() throws Exception { - ozoneConfig = new OzoneConfiguration(); - ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY, - SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class); - cluster = - MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1).build(); - cluster.waitForClusterToBeReady(); - storageContainerLocationClient = - cluster.getStorageContainerLocationClient(); - xceiverClientManager = new XceiverClientManager(ozoneConfig); - } - - @AfterClass - public static void shutdown() throws InterruptedException { - if (cluster != null) { - cluster.shutdown(); - } - IOUtils.cleanupWithLogger(null, storageContainerLocationClient); - } - - @Test - public void tesGetCommittedBlockLength() throws Exception { - ContainerProtos.GetCommittedBlockLengthResponseProto response; - String traceID = UUID.randomUUID().toString(); - ContainerWithPipeline container = storageContainerLocationClient - .allocateContainer(xceiverClientManager.getType(), - HddsProtos.ReplicationFactor.ONE, containerOwner); - long containerID = container.getContainerInfo().getContainerID(); - Pipeline pipeline = container.getPipeline(); - XceiverClientSpi client = - xceiverClientManager.acquireClient(pipeline, containerID); - //create the container - ContainerProtocolCalls.createContainer(client, containerID, traceID); - - BlockID blockID = ContainerTestHelper.getTestBlockID(containerID); - byte[] data = - RandomStringUtils.random(RandomUtils.nextInt(0, 1024)).getBytes(); - ContainerProtos.ContainerCommandRequestProto writeChunkRequest = - ContainerTestHelper - .getWriteChunkRequest(container.getPipeline(), blockID, - data.length); - client.sendCommand(writeChunkRequest); - // Now, explicitly make a putKey request for the block. - ContainerProtos.ContainerCommandRequestProto putKeyRequest = - ContainerTestHelper - .getPutKeyRequest(pipeline, writeChunkRequest.getWriteChunk()); - client.sendCommand(putKeyRequest); - response = ContainerProtocolCalls - .getCommittedBlockLength(client, blockID, traceID); - // make sure the block ids in the request and response are same. - Assert.assertTrue( - BlockID.getFromProtobuf(response.getBlockID()).equals(blockID)); - Assert.assertTrue(response.getBlockLength() == data.length); - xceiverClientManager.releaseClient(client); - } - - @Test - public void tesGetCommittedBlockLengthWithClosedContainer() - throws Exception { - String traceID = UUID.randomUUID().toString(); - ContainerWithPipeline container = storageContainerLocationClient - .allocateContainer(xceiverClientManager.getType(), - HddsProtos.ReplicationFactor.ONE, containerOwner); - long containerID = container.getContainerInfo().getContainerID(); - Pipeline pipeline = container.getPipeline(); - XceiverClientSpi client = - xceiverClientManager.acquireClient(pipeline, containerID); - // create the container - ContainerProtocolCalls.createContainer(client, containerID, traceID); - - byte[] data = - RandomStringUtils.random(RandomUtils.nextInt(0, 1024)).getBytes(); - BlockID blockID = ContainerTestHelper.getTestBlockID(containerID); - ContainerProtos.ContainerCommandRequestProto writeChunkRequest = - ContainerTestHelper - .getWriteChunkRequest(container.getPipeline(), blockID, - data.length); - client.sendCommand(writeChunkRequest); - // close the container - ContainerProtocolCalls.closeContainer(client, containerID, traceID); - ContainerProtos.GetCommittedBlockLengthResponseProto response = - ContainerProtocolCalls - .getCommittedBlockLength(client, blockID, traceID); - // make sure the block ids in the request and response are same. - // This will also ensure that closing the container committed the block - // on the Datanodes. - Assert.assertTrue( - BlockID.getFromProtobuf(response.getBlockID()).equals(blockID)); - Assert.assertTrue(response.getBlockLength() == data.length); - xceiverClientManager.releaseClient(client); - } - - @Test - public void tesGetCommittedBlockLengthForInvalidBlock() throws Exception { - String traceID = UUID.randomUUID().toString(); - ContainerWithPipeline container = storageContainerLocationClient - .allocateContainer(xceiverClientManager.getType(), - HddsProtos.ReplicationFactor.ONE, containerOwner); - long containerID = container.getContainerInfo().getContainerID(); - XceiverClientSpi client = xceiverClientManager - .acquireClient(container.getPipeline(), containerID); - ContainerProtocolCalls.createContainer(client, containerID, traceID); - - BlockID blockID = ContainerTestHelper.getTestBlockID(containerID); - // move the container to closed state - ContainerProtocolCalls.closeContainer(client, containerID, traceID); - try { - // There is no block written inside the container. The request should - // fail. - ContainerProtocolCalls.getCommittedBlockLength(client, blockID, traceID); - Assert.fail("Expected exception not thrown"); - } catch (StorageContainerException sce) { - Assert.assertTrue(sce.getMessage().contains("Unable to find the key")); - } - xceiverClientManager.releaseClient(client); - } - - @Test - public void testGetCommittedBlockLengthForOpenBlock() throws Exception { - String traceID = UUID.randomUUID().toString(); - ContainerWithPipeline container = storageContainerLocationClient - .allocateContainer(xceiverClientManager.getType(), - HddsProtos.ReplicationFactor.ONE, containerOwner); - long containerID = container.getContainerInfo().getContainerID(); - XceiverClientSpi client = xceiverClientManager - .acquireClient(container.getPipeline(), containerID); - ContainerProtocolCalls - .createContainer(client, containerID, traceID); - - BlockID blockID = - ContainerTestHelper.getTestBlockID(containerID); - ContainerProtos.ContainerCommandRequestProto requestProto = - ContainerTestHelper - .getWriteChunkRequest(container.getPipeline(), blockID, 1024); - client.sendCommand(requestProto); - try { - ContainerProtocolCalls.getCommittedBlockLength(client, blockID, traceID); - Assert.fail("Expected Exception not thrown"); - } catch (StorageContainerException sce) { - Assert.assertEquals(ContainerProtos.Result.BLOCK_NOT_COMMITTED, - sce.getResult()); - } - // now close the container, it should auto commit pending open blocks - ContainerProtocolCalls - .closeContainer(client, containerID, traceID); - ContainerProtos.GetCommittedBlockLengthResponseProto response = - ContainerProtocolCalls - .getCommittedBlockLength(client, blockID, traceID); - Assert.assertTrue(response.getBlockLength() == 1024); - xceiverClientManager.releaseClient(client); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/398d8955/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java new file mode 100644 index 0000000..f82b0d3 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.scm; + +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.RandomUtils; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.hadoop.hdds.scm.container.common.helpers. + ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers. + StorageContainerException; +import org.apache.hadoop.hdds.scm.container.placement.algorithms. + ContainerPlacementPolicy; +import org.apache.hadoop.hdds.scm.container.placement.algorithms. + SCMContainerPlacementCapacity; +import org.apache.hadoop.hdds.scm.protocolPB. + StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + + +import java.util.UUID; + +/** + * Test Container calls. + */ +public class TestGetCommittedBlockLengthAndPutKey { + + private static MiniOzoneCluster cluster; + private static OzoneConfiguration ozoneConfig; + private static StorageContainerLocationProtocolClientSideTranslatorPB + storageContainerLocationClient; + private static XceiverClientManager xceiverClientManager; + private static String containerOwner = "OZONE"; + + @BeforeClass + public static void init() throws Exception { + ozoneConfig = new OzoneConfiguration(); + ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY, + SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class); + cluster = + MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1).build(); + cluster.waitForClusterToBeReady(); + storageContainerLocationClient = + cluster.getStorageContainerLocationClient(); + xceiverClientManager = new XceiverClientManager(ozoneConfig); + } + + @AfterClass + public static void shutdown() throws InterruptedException { + if (cluster != null) { + cluster.shutdown(); + } + IOUtils.cleanupWithLogger(null, storageContainerLocationClient); + } + + @Test + public void tesGetCommittedBlockLength() throws Exception { + ContainerProtos.GetCommittedBlockLengthResponseProto response; + String traceID = UUID.randomUUID().toString(); + ContainerWithPipeline container = storageContainerLocationClient + .allocateContainer(xceiverClientManager.getType(), + HddsProtos.ReplicationFactor.ONE, containerOwner); + long containerID = container.getContainerInfo().getContainerID(); + Pipeline pipeline = container.getPipeline(); + XceiverClientSpi client = + xceiverClientManager.acquireClient(pipeline, containerID); + //create the container + ContainerProtocolCalls.createContainer(client, containerID, traceID); + + BlockID blockID = ContainerTestHelper.getTestBlockID(containerID); + byte[] data = + RandomStringUtils.random(RandomUtils.nextInt(0, 1024)).getBytes(); + ContainerProtos.ContainerCommandRequestProto writeChunkRequest = + ContainerTestHelper + .getWriteChunkRequest(container.getPipeline(), blockID, + data.length); + client.sendCommand(writeChunkRequest); + // Now, explicitly make a putKey request for the block. + ContainerProtos.ContainerCommandRequestProto putKeyRequest = + ContainerTestHelper + .getPutKeyRequest(pipeline, writeChunkRequest.getWriteChunk()); + client.sendCommand(putKeyRequest); + response = ContainerProtocolCalls + .getCommittedBlockLength(client, blockID, traceID); + // make sure the block ids in the request and response are same. + Assert.assertTrue( + BlockID.getFromProtobuf(response.getBlockID()).equals(blockID)); + Assert.assertTrue(response.getBlockLength() == data.length); + xceiverClientManager.releaseClient(client); + } + + @Test + public void tesGetCommittedBlockLengthWithClosedContainer() + throws Exception { + String traceID = UUID.randomUUID().toString(); + ContainerWithPipeline container = storageContainerLocationClient + .allocateContainer(xceiverClientManager.getType(), + HddsProtos.ReplicationFactor.ONE, containerOwner); + long containerID = container.getContainerInfo().getContainerID(); + Pipeline pipeline = container.getPipeline(); + XceiverClientSpi client = + xceiverClientManager.acquireClient(pipeline, containerID); + // create the container + ContainerProtocolCalls.createContainer(client, containerID, traceID); + + byte[] data = + RandomStringUtils.random(RandomUtils.nextInt(0, 1024)).getBytes(); + BlockID blockID = ContainerTestHelper.getTestBlockID(containerID); + ContainerProtos.ContainerCommandRequestProto writeChunkRequest = + ContainerTestHelper + .getWriteChunkRequest(container.getPipeline(), blockID, + data.length); + client.sendCommand(writeChunkRequest); + // close the container + ContainerProtocolCalls.closeContainer(client, containerID, traceID); + ContainerProtos.GetCommittedBlockLengthResponseProto response = + ContainerProtocolCalls + .getCommittedBlockLength(client, blockID, traceID); + // make sure the block ids in the request and response are same. + // This will also ensure that closing the container committed the block + // on the Datanodes. + Assert.assertTrue( + BlockID.getFromProtobuf(response.getBlockID()).equals(blockID)); + Assert.assertTrue(response.getBlockLength() == data.length); + xceiverClientManager.releaseClient(client); + } + + @Test + public void tesGetCommittedBlockLengthForInvalidBlock() throws Exception { + String traceID = UUID.randomUUID().toString(); + ContainerWithPipeline container = storageContainerLocationClient + .allocateContainer(xceiverClientManager.getType(), + HddsProtos.ReplicationFactor.ONE, containerOwner); + long containerID = container.getContainerInfo().getContainerID(); + XceiverClientSpi client = xceiverClientManager + .acquireClient(container.getPipeline(), containerID); + ContainerProtocolCalls.createContainer(client, containerID, traceID); + + BlockID blockID = ContainerTestHelper.getTestBlockID(containerID); + // move the container to closed state + ContainerProtocolCalls.closeContainer(client, containerID, traceID); + try { + // There is no block written inside the container. The request should + // fail. + ContainerProtocolCalls.getCommittedBlockLength(client, blockID, traceID); + Assert.fail("Expected exception not thrown"); + } catch (StorageContainerException sce) { + Assert.assertTrue(sce.getMessage().contains("Unable to find the key")); + } + xceiverClientManager.releaseClient(client); + } + + @Test + public void testGetCommittedBlockLengthForOpenBlock() throws Exception { + String traceID = UUID.randomUUID().toString(); + ContainerWithPipeline container = storageContainerLocationClient + .allocateContainer(xceiverClientManager.getType(), + HddsProtos.ReplicationFactor.ONE, containerOwner); + long containerID = container.getContainerInfo().getContainerID(); + XceiverClientSpi client = xceiverClientManager + .acquireClient(container.getPipeline(), containerID); + ContainerProtocolCalls + .createContainer(client, containerID, traceID); + + BlockID blockID = + ContainerTestHelper.getTestBlockID(containerID); + ContainerProtos.ContainerCommandRequestProto requestProto = + ContainerTestHelper + .getWriteChunkRequest(container.getPipeline(), blockID, 1024); + client.sendCommand(requestProto); + try { + ContainerProtocolCalls.getCommittedBlockLength(client, blockID, traceID); + Assert.fail("Expected Exception not thrown"); + } catch (StorageContainerException sce) { + Assert.assertEquals(ContainerProtos.Result.BLOCK_NOT_COMMITTED, + sce.getResult()); + } + // now close the container, it should auto commit pending open blocks + ContainerProtocolCalls + .closeContainer(client, containerID, traceID); + ContainerProtos.GetCommittedBlockLengthResponseProto response = + ContainerProtocolCalls + .getCommittedBlockLength(client, blockID, traceID); + Assert.assertTrue(response.getBlockLength() == 1024); + xceiverClientManager.releaseClient(client); + } + + @Test + public void tesPutKeyResposne() throws Exception { + ContainerProtos.PutKeyResponseProto response; + String traceID = UUID.randomUUID().toString(); + ContainerWithPipeline container = storageContainerLocationClient + .allocateContainer(xceiverClientManager.getType(), + HddsProtos.ReplicationFactor.ONE, containerOwner); + long containerID = container.getContainerInfo().getContainerID(); + Pipeline pipeline = container.getPipeline(); + XceiverClientSpi client = + xceiverClientManager.acquireClient(pipeline, containerID); + //create the container + ContainerProtocolCalls.createContainer(client, containerID, traceID); + + BlockID blockID = ContainerTestHelper.getTestBlockID(containerID); + byte[] data = + RandomStringUtils.random(RandomUtils.nextInt(0, 1024)).getBytes(); + ContainerProtos.ContainerCommandRequestProto writeChunkRequest = + ContainerTestHelper + .getWriteChunkRequest(container.getPipeline(), blockID, + data.length); + client.sendCommand(writeChunkRequest); + // Now, explicitly make a putKey request for the block. + ContainerProtos.ContainerCommandRequestProto putKeyRequest = + ContainerTestHelper + .getPutKeyRequest(pipeline, writeChunkRequest.getWriteChunk()); + response = client.sendCommand(putKeyRequest).getPutKey(); + // make sure the block ids in the request and response are same. + // This will also ensure that closing the container committed the block + // on the Datanodes. + Assert.assertEquals(BlockID + .getFromProtobuf(response.getCommittedBlockLength().getBlockID()), + blockID); + Assert.assertEquals( + response.getCommittedBlockLength().getBlockLength(), data.length); + xceiverClientManager.releaseClient(client); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org