HDDS-394. Rename *Key Apis in DatanodeContainerProtocol to *Block apis. Contributed Dinesh Chitlangia.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/096a7160 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/096a7160 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/096a7160 Branch: refs/heads/YARN-1011 Commit: 096a7160803494219581c067dfcdb67d2bd0bcdb Parents: aa4bd49 Author: Anu Engineer <[email protected]> Authored: Thu Sep 20 11:51:49 2018 -0700 Committer: Anu Engineer <[email protected]> Committed: Thu Sep 20 11:51:49 2018 -0700 ---------------------------------------------------------------------- .../hdds/scm/storage/ChunkOutputStream.java | 13 +- .../java/org/apache/hadoop/hdds/HddsUtils.java | 8 +- .../scm/storage/ContainerProtocolCalls.java | 62 ++--- .../container/common/helpers/BlockData.java | 255 +++++++++++++++++++ .../ozone/container/common/helpers/KeyData.java | 253 ------------------ .../main/proto/DatanodeContainerProtocol.proto | 74 +++--- .../common/impl/OpenContainerBlockMap.java | 46 ++-- .../DeleteBlocksCommandHandler.java | 4 +- .../server/ratis/ContainerStateMachine.java | 28 +- .../keyvalue/KeyValueBlockIterator.java | 16 +- .../container/keyvalue/KeyValueContainer.java | 4 +- .../container/keyvalue/KeyValueHandler.java | 124 ++++----- .../container/keyvalue/helpers/BlockUtils.java | 199 +++++++++++++++ .../container/keyvalue/helpers/KeyUtils.java | 199 --------------- .../keyvalue/helpers/KeyValueContainerUtil.java | 12 +- .../keyvalue/helpers/SmallFileUtils.java | 2 +- .../keyvalue/impl/BlockManagerImpl.java | 229 +++++++++++++++++ .../container/keyvalue/impl/KeyManagerImpl.java | 227 ----------------- .../container/keyvalue/impl/package-info.java | 5 +- .../keyvalue/interfaces/BlockManager.java | 84 ++++++ .../keyvalue/interfaces/KeyManager.java | 84 ------ .../keyvalue/interfaces/package-info.java | 21 ++ .../background/BlockDeletingService.java | 10 +- .../keyvalue/TestBlockManagerImpl.java | 211 +++++++++++++++ .../keyvalue/TestChunkManagerImpl.java | 2 +- .../container/keyvalue/TestKeyManagerImpl.java | 191 -------------- .../keyvalue/TestKeyValueBlockIterator.java | 30 +-- .../keyvalue/TestKeyValueContainer.java | 26 +- .../container/keyvalue/TestKeyValueHandler.java | 38 +-- .../ozone/client/io/ChunkGroupInputStream.java | 6 +- .../TestStorageContainerManagerHelper.java | 8 +- .../ozone/client/rest/TestOzoneRestClient.java | 8 +- .../ozone/client/rpc/TestOzoneRpcClient.java | 8 +- .../ozone/container/ContainerTestHelper.java | 84 +++--- .../container/TestContainerReplication.java | 24 +- .../common/TestBlockDeletingService.java | 12 +- .../container/common/helpers/TestBlockData.java | 127 +++++++++ .../container/common/helpers/TestKeyData.java | 119 --------- .../common/impl/TestCloseContainerHandler.java | 51 ++-- .../common/impl/TestContainerPersistence.java | 154 +++++------ .../commandhandler/TestBlockDeletion.java | 9 +- .../container/ozoneimpl/TestOzoneContainer.java | 100 ++++---- .../server/TestContainerStateMachine.java | 2 +- .../hadoop/ozone/om/TestOzoneManager.java | 4 +- .../ozone/scm/TestContainerSmallFile.java | 4 +- .../TestGetCommittedBlockLengthAndPutKey.java | 12 +- .../hadoop/ozone/web/client/TestKeys.java | 44 ++-- .../hadoop/ozone/om/BucketManagerImpl.java | 2 +- .../ozone/om/ScmBlockLocationTestIngClient.java | 2 +- .../genesis/BenchMarkDatanodeDispatcher.java | 42 +-- 50 files changed, 1680 insertions(+), 1599 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/096a7160/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java index 8d311d0..10b3bb5 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java @@ -23,7 +23,7 @@ import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyData; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; import org.apache.hadoop.hdds.client.BlockID; @@ -32,7 +32,8 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.UUID; -import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putKey; +import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls + .putBlock; import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls .writeChunk; @@ -57,7 +58,7 @@ public class ChunkOutputStream extends OutputStream { private final BlockID blockID; private final String key; private final String traceID; - private final KeyData.Builder containerKeyData; + private final BlockData.Builder containerBlockData; private XceiverClientManager xceiverClientManager; private XceiverClientSpi xceiverClient; private ByteBuffer buffer; @@ -84,7 +85,7 @@ public class ChunkOutputStream extends OutputStream { this.chunkSize = chunkSize; KeyValue keyValue = KeyValue.newBuilder() .setKey("TYPE").setValue("KEY").build(); - this.containerKeyData = KeyData.newBuilder() + this.containerBlockData = BlockData.newBuilder() .setBlockID(blockID.getDatanodeBlockIDProtobuf()) .addMetadata(keyValue); this.xceiverClientManager = xceiverClientManager; @@ -154,7 +155,7 @@ public class ChunkOutputStream extends OutputStream { writeChunkToContainer(); } try { - putKey(xceiverClient, containerKeyData.build(), traceID); + putBlock(xceiverClient, containerBlockData.build(), traceID); } catch (IOException e) { throw new IOException( "Unexpected Storage Container Exception: " + e.toString(), e); @@ -230,6 +231,6 @@ public class ChunkOutputStream extends OutputStream { throw new IOException( "Unexpected Storage Container Exception: " + e.toString(), e); } - containerKeyData.addChunks(chunk); + containerBlockData.addChunks(chunk); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/096a7160/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java index 33bf90c..db9d374 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java @@ -326,8 +326,8 @@ public final class HddsUtils { switch (proto.getCmdType()) { case ReadContainer: case ReadChunk: - case ListKey: - case GetKey: + case ListBlock: + case GetBlock: case GetSmallFile: case ListContainer: case ListChunk: @@ -340,8 +340,8 @@ public final class HddsUtils { case CreateContainer: case DeleteChunk: case DeleteContainer: - case DeleteKey: - case PutKey: + case DeleteBlock: + case PutBlock: case PutSmallFile: default: return false; http://git-wip-us.apache.org/repos/asf/hadoop/blob/096a7160/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 1d6a89d..6b7a328 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -35,16 +35,16 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .DatanodeBlockID; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .GetKeyRequestProto; + .GetBlockRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .GetKeyResponseProto; + .GetBlockResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .GetSmallFileRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .GetSmallFileResponseProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyData; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .PutKeyRequestProto; + .PutBlockRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .PutSmallFileRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos @@ -76,33 +76,33 @@ public final class ContainerProtocolCalls { } /** - * Calls the container protocol to get a container key. + * Calls the container protocol to get a container block. * * @param xceiverClient client to perform call * @param datanodeBlockID blockID to identify container * @param traceID container protocol call args - * @return container protocol get key response + * @return container protocol get block response * @throws IOException if there is an I/O error while performing the call */ - public static GetKeyResponseProto getKey(XceiverClientSpi xceiverClient, + public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, DatanodeBlockID datanodeBlockID, String traceID) throws IOException { - GetKeyRequestProto.Builder readKeyRequest = GetKeyRequestProto + GetBlockRequestProto.Builder readBlockRequest = GetBlockRequestProto .newBuilder() .setBlockID(datanodeBlockID); String id = xceiverClient.getPipeline().getLeader().getUuidString(); ContainerCommandRequestProto request = ContainerCommandRequestProto .newBuilder() - .setCmdType(Type.GetKey) + .setCmdType(Type.GetBlock) .setContainerID(datanodeBlockID.getContainerID()) .setTraceID(traceID) .setDatanodeUuid(id) - .setGetKey(readKeyRequest) + .setGetBlock(readBlockRequest) .build(); ContainerCommandResponseProto response = xceiverClient.sendCommand(request); validateContainerResponse(response); - return response.getGetKey(); + return response.getGetBlock(); } /** @@ -136,26 +136,26 @@ public final class ContainerProtocolCalls { } /** - * Calls the container protocol to put a container key. + * Calls the container protocol to put a container block. * * @param xceiverClient client to perform call - * @param containerKeyData key data to identify container + * @param containerBlockData block data to identify container * @param traceID container protocol call args * @throws IOException if there is an I/O error while performing the call */ - public static void putKey(XceiverClientSpi xceiverClient, - KeyData containerKeyData, String traceID) throws IOException { - PutKeyRequestProto.Builder createKeyRequest = PutKeyRequestProto + public static void putBlock(XceiverClientSpi xceiverClient, + BlockData containerBlockData, String traceID) throws IOException { + PutBlockRequestProto.Builder createBlockRequest = PutBlockRequestProto .newBuilder() - .setKeyData(containerKeyData); + .setBlockData(containerBlockData); String id = xceiverClient.getPipeline().getLeader().getUuidString(); ContainerCommandRequestProto request = ContainerCommandRequestProto .newBuilder() - .setCmdType(Type.PutKey) - .setContainerID(containerKeyData.getBlockID().getContainerID()) + .setCmdType(Type.PutBlock) + .setContainerID(containerBlockData.getBlockID().getContainerID()) .setTraceID(traceID) .setDatanodeUuid(id) - .setPutKey(createKeyRequest) + .setPutBlock(createBlockRequest) .build(); ContainerCommandResponseProto response = xceiverClient.sendCommand(request); validateContainerResponse(response); @@ -224,9 +224,9 @@ public final class ContainerProtocolCalls { /** * Allows writing a small file using single RPC. This takes the container - * name, key name and data to write sends all that data to the container using - * a single RPC. This API is designed to be used for files which are smaller - * than 1 MB. + * name, block name and data to write sends all that data to the container + * using a single RPC. This API is designed to be used for files which are + * smaller than 1 MB. * * @param client - client that communicates with the container. * @param blockID - ID of the block @@ -238,12 +238,12 @@ public final class ContainerProtocolCalls { BlockID blockID, byte[] data, String traceID) throws IOException { - KeyData containerKeyData = - KeyData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf()) + BlockData containerBlockData = + BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf()) .build(); - PutKeyRequestProto.Builder createKeyRequest = - PutKeyRequestProto.newBuilder() - .setKeyData(containerKeyData); + PutBlockRequestProto.Builder createBlockRequest = + PutBlockRequestProto.newBuilder() + .setBlockData(containerBlockData); KeyValue keyValue = KeyValue.newBuilder().setKey("OverWriteRequested").setValue("true") @@ -255,7 +255,7 @@ public final class ContainerProtocolCalls { PutSmallFileRequestProto putSmallFileRequest = PutSmallFileRequestProto.newBuilder().setChunkInfo(chunk) - .setKey(createKeyRequest).setData(ByteString.copyFrom(data)) + .setBlock(createBlockRequest).setData(ByteString.copyFrom(data)) .build(); String id = client.getPipeline().getLeader().getUuidString(); @@ -387,12 +387,12 @@ public final class ContainerProtocolCalls { */ public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client, BlockID blockID, String traceID) throws IOException { - GetKeyRequestProto.Builder getKey = GetKeyRequestProto + GetBlockRequestProto.Builder getBlock = GetBlockRequestProto .newBuilder() .setBlockID(blockID.getDatanodeBlockIDProtobuf()); ContainerProtos.GetSmallFileRequestProto getSmallFileRequest = GetSmallFileRequestProto - .newBuilder().setKey(getKey) + .newBuilder().setBlock(getBlock) .build(); String id = client.getPipeline().getLeader().getUuidString(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/096a7160/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockData.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockData.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockData.java new file mode 100644 index 0000000..0c1d427 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockData.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.helpers; + +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.client.BlockID; +import com.google.common.base.Preconditions; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.ArrayList; + +/** + * Helper class to convert Protobuf to Java classes. + */ +public class BlockData { + private final BlockID blockID; + private final Map<String, String> metadata; + + /** + * Represent a list of chunks. + * In order to reduce memory usage, chunkList is declared as an + * {@link Object}. + * When #elements == 0, chunkList is null. + * When #elements == 1, chunkList refers to the only element. + * When #elements > 1, chunkList refers to the list. + * + * Please note : when we are working with blocks, we don't care what they + * point to. So we We don't read chunkinfo nor validate them. It is + * responsibility of higher layer like ozone. We just read and write data + * from network. + */ + private Object chunkList; + + /** + * total size of the key. + */ + private long size; + + /** + * Constructs a BlockData Object. + * + * @param blockID + */ + public BlockData(BlockID blockID) { + this.blockID = blockID; + this.metadata = new TreeMap<>(); + this.size = 0; + } + + /** + * Returns a blockData object from the protobuf data. + * + * @param data - Protobuf data. + * @return - BlockData + * @throws IOException + */ + public static BlockData getFromProtoBuf(ContainerProtos.BlockData data) throws + IOException { + BlockData blockData = new BlockData( + BlockID.getFromProtobuf(data.getBlockID())); + for (int x = 0; x < data.getMetadataCount(); x++) { + blockData.addMetadata(data.getMetadata(x).getKey(), + data.getMetadata(x).getValue()); + } + blockData.setChunks(data.getChunksList()); + if (data.hasSize()) { + Preconditions.checkArgument(data.getSize() == blockData.getSize()); + } + return blockData; + } + + /** + * Returns a Protobuf message from BlockData. + * @return Proto Buf Message. + */ + public ContainerProtos.BlockData getProtoBufMessage() { + ContainerProtos.BlockData.Builder builder = + ContainerProtos.BlockData.newBuilder(); + builder.setBlockID(this.blockID.getDatanodeBlockIDProtobuf()); + for (Map.Entry<String, String> entry : metadata.entrySet()) { + ContainerProtos.KeyValue.Builder keyValBuilder = + ContainerProtos.KeyValue.newBuilder(); + builder.addMetadata(keyValBuilder.setKey(entry.getKey()) + .setValue(entry.getValue()).build()); + } + builder.addAllChunks(getChunks()); + builder.setSize(size); + return builder.build(); + } + + /** + * Adds metadata. + * + * @param key - Key + * @param value - Value + * @throws IOException + */ + public synchronized void addMetadata(String key, String value) throws + IOException { + if (this.metadata.containsKey(key)) { + throw new IOException("This key already exists. Key " + key); + } + metadata.put(key, value); + } + + public synchronized Map<String, String> getMetadata() { + return Collections.unmodifiableMap(this.metadata); + } + + /** + * Returns value of a key. + */ + public synchronized String getValue(String key) { + return metadata.get(key); + } + + /** + * Deletes a metadata entry from the map. + * + * @param key - Key + */ + public synchronized void deleteKey(String key) { + metadata.remove(key); + } + + @SuppressWarnings("unchecked") + private List<ContainerProtos.ChunkInfo> castChunkList() { + return (List<ContainerProtos.ChunkInfo>)chunkList; + } + + /** + * Returns chunks list. + * + * @return list of chunkinfo. + */ + public List<ContainerProtos.ChunkInfo> getChunks() { + return chunkList == null? Collections.emptyList() + : chunkList instanceof ContainerProtos.ChunkInfo? + Collections.singletonList((ContainerProtos.ChunkInfo)chunkList) + : Collections.unmodifiableList(castChunkList()); + } + + /** + * Adds chinkInfo to the list. + */ + public void addChunk(ContainerProtos.ChunkInfo chunkInfo) { + if (chunkList == null) { + chunkList = chunkInfo; + } else { + final List<ContainerProtos.ChunkInfo> list; + if (chunkList instanceof ContainerProtos.ChunkInfo) { + list = new ArrayList<>(2); + list.add((ContainerProtos.ChunkInfo)chunkList); + chunkList = list; + } else { + list = castChunkList(); + } + list.add(chunkInfo); + } + size += chunkInfo.getLen(); + } + + /** + * removes the chunk. + */ + public boolean removeChunk(ContainerProtos.ChunkInfo chunkInfo) { + final boolean removed; + if (chunkList instanceof List) { + final List<ContainerProtos.ChunkInfo> list = castChunkList(); + removed = list.remove(chunkInfo); + if (list.size() == 1) { + chunkList = list.get(0); + } + } else if (chunkInfo.equals(chunkList)) { + chunkList = null; + removed = true; + } else { + removed = false; + } + + if (removed) { + size -= chunkInfo.getLen(); + } + return removed; + } + + /** + * Returns container ID. + * + * @return long. + */ + public long getContainerID() { + return blockID.getContainerID(); + } + + /** + * Returns LocalID. + * @return long. + */ + public long getLocalID() { + return blockID.getLocalID(); + } + + /** + * Return Block ID. + * @return BlockID. + */ + public BlockID getBlockID() { + return blockID; + } + + /** + * Sets Chunk list. + * + * @param chunks - List of chunks. + */ + public void setChunks(List<ContainerProtos.ChunkInfo> chunks) { + if (chunks == null) { + chunkList = null; + size = 0L; + } else { + final int n = chunks.size(); + chunkList = n == 0? null: n == 1? chunks.get(0): chunks; + size = chunks.parallelStream().mapToLong( + ContainerProtos.ChunkInfo::getLen).sum(); + } + } + + /** + * Get the total size of chunks allocated for the key. + * @return total size of the key. + */ + public long getSize() { + return size; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/096a7160/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java deleted file mode 100644 index ee27021..0000000 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java +++ /dev/null @@ -1,253 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.common.helpers; - -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.client.BlockID; -import com.google.common.base.Preconditions; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import java.util.ArrayList; - -/** - * Helper class to convert Protobuf to Java classes. - */ -public class KeyData { - private final BlockID blockID; - private final Map<String, String> metadata; - - /** - * Represent a list of chunks. - * In order to reduce memory usage, chunkList is declared as an - * {@link Object}. - * When #elements == 0, chunkList is null. - * When #elements == 1, chunkList refers to the only element. - * When #elements > 1, chunkList refers to the list. - * - * Please note : when we are working with keys, we don't care what they point - * to. So we We don't read chunkinfo nor validate them. It is responsibility - * of higher layer like ozone. We just read and write data from network. - */ - private Object chunkList; - - /** - * total size of the key. - */ - private long size; - - /** - * Constructs a KeyData Object. - * - * @param blockID - */ - public KeyData(BlockID blockID) { - this.blockID = blockID; - this.metadata = new TreeMap<>(); - this.size = 0; - } - - /** - * Returns a keyData object from the protobuf data. - * - * @param data - Protobuf data. - * @return - KeyData - * @throws IOException - */ - public static KeyData getFromProtoBuf(ContainerProtos.KeyData data) throws - IOException { - KeyData keyData = new KeyData(BlockID.getFromProtobuf(data.getBlockID())); - for (int x = 0; x < data.getMetadataCount(); x++) { - keyData.addMetadata(data.getMetadata(x).getKey(), - data.getMetadata(x).getValue()); - } - keyData.setChunks(data.getChunksList()); - if (data.hasSize()) { - Preconditions.checkArgument(data.getSize() == keyData.getSize()); - } - return keyData; - } - - /** - * Returns a Protobuf message from KeyData. - * @return Proto Buf Message. - */ - public ContainerProtos.KeyData getProtoBufMessage() { - ContainerProtos.KeyData.Builder builder = - ContainerProtos.KeyData.newBuilder(); - builder.setBlockID(this.blockID.getDatanodeBlockIDProtobuf()); - for (Map.Entry<String, String> entry : metadata.entrySet()) { - ContainerProtos.KeyValue.Builder keyValBuilder = - ContainerProtos.KeyValue.newBuilder(); - builder.addMetadata(keyValBuilder.setKey(entry.getKey()) - .setValue(entry.getValue()).build()); - } - builder.addAllChunks(getChunks()); - builder.setSize(size); - return builder.build(); - } - - /** - * Adds metadata. - * - * @param key - Key - * @param value - Value - * @throws IOException - */ - public synchronized void addMetadata(String key, String value) throws - IOException { - if (this.metadata.containsKey(key)) { - throw new IOException("This key already exists. Key " + key); - } - metadata.put(key, value); - } - - public synchronized Map<String, String> getMetadata() { - return Collections.unmodifiableMap(this.metadata); - } - - /** - * Returns value of a key. - */ - public synchronized String getValue(String key) { - return metadata.get(key); - } - - /** - * Deletes a metadata entry from the map. - * - * @param key - Key - */ - public synchronized void deleteKey(String key) { - metadata.remove(key); - } - - @SuppressWarnings("unchecked") - private List<ContainerProtos.ChunkInfo> castChunkList() { - return (List<ContainerProtos.ChunkInfo>)chunkList; - } - - /** - * Returns chunks list. - * - * @return list of chunkinfo. - */ - public List<ContainerProtos.ChunkInfo> getChunks() { - return chunkList == null? Collections.emptyList() - : chunkList instanceof ContainerProtos.ChunkInfo? - Collections.singletonList((ContainerProtos.ChunkInfo)chunkList) - : Collections.unmodifiableList(castChunkList()); - } - - /** - * Adds chinkInfo to the list. - */ - public void addChunk(ContainerProtos.ChunkInfo chunkInfo) { - if (chunkList == null) { - chunkList = chunkInfo; - } else { - final List<ContainerProtos.ChunkInfo> list; - if (chunkList instanceof ContainerProtos.ChunkInfo) { - list = new ArrayList<>(2); - list.add((ContainerProtos.ChunkInfo)chunkList); - chunkList = list; - } else { - list = castChunkList(); - } - list.add(chunkInfo); - } - size += chunkInfo.getLen(); - } - - /** - * removes the chunk. - */ - public boolean removeChunk(ContainerProtos.ChunkInfo chunkInfo) { - final boolean removed; - if (chunkList instanceof List) { - final List<ContainerProtos.ChunkInfo> list = castChunkList(); - removed = list.remove(chunkInfo); - if (list.size() == 1) { - chunkList = list.get(0); - } - } else if (chunkInfo.equals(chunkList)) { - chunkList = null; - removed = true; - } else { - removed = false; - } - - if (removed) { - size -= chunkInfo.getLen(); - } - return removed; - } - - /** - * Returns container ID. - * - * @return long. - */ - public long getContainerID() { - return blockID.getContainerID(); - } - - /** - * Returns LocalID. - * @return long. - */ - public long getLocalID() { - return blockID.getLocalID(); - } - - /** - * Return Block ID. - * @return BlockID. - */ - public BlockID getBlockID() { - return blockID; - } - - /** - * Sets Chunk list. - * - * @param chunks - List of chunks. - */ - public void setChunks(List<ContainerProtos.ChunkInfo> chunks) { - if (chunks == null) { - chunkList = null; - size = 0L; - } else { - final int n = chunks.size(); - chunkList = n == 0? null: n == 1? chunks.get(0): chunks; - size = chunks.parallelStream().mapToLong( - ContainerProtos.ChunkInfo::getLen).sum(); - } - } - - /** - * Get the total size of chunks allocated for the key. - * @return total size of the key. - */ - public long getSize() { - return size; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/096a7160/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto index ba0d2d4..7be8a62 100644 --- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto @@ -49,13 +49,13 @@ package hadoop.hdds.datanode; * 5. ListContainer - Returns the list of containers on this * datanode. This will be used by tests and tools. * - * 6. PutKey - Given a valid container, creates a key. + * 6. PutBlock - Given a valid container, creates a block. * - * 7. GetKey - Allows user to read the metadata of a Key. + * 7. GetBlock - Allows user to read the metadata of a block. * - * 8. DeleteKey - Deletes a given key. + * 8. DeleteBlock - Deletes a given block. * - * 9. ListKey - Returns a list of keys that are present inside + * 9. ListBlock - Returns a list of blocks that are present inside * a given container. * * 10. ReadChunk - Allows us to read a chunk. @@ -64,13 +64,13 @@ package hadoop.hdds.datanode; * * 12. WriteChunk - Allows us to write a chunk * - * 13. ListChunk - Given a Container/Key returns the list of Chunks. + * 13. ListChunk - Given a Container/Block returns the list of Chunks. * * 14. CompactChunk - Re-writes a chunk based on Offsets. * - * 15. PutSmallFile - A single RPC that combines both putKey and WriteChunk. + * 15. PutSmallFile - A single RPC that combines both putBlock and WriteChunk. * - * 16. GetSmallFile - A single RPC that combines both getKey and ReadChunk. + * 16. GetSmallFile - A single RPC that combines both getBlock and ReadChunk. * * 17. CloseContainer - Closes an open container and makes it immutable. * @@ -84,10 +84,10 @@ enum Type { DeleteContainer = 4; ListContainer = 5; - PutKey = 6; - GetKey = 7; - DeleteKey = 8; - ListKey = 9; + PutBlock = 6; + GetBlock = 7; + DeleteBlock = 8; + ListBlock = 9; ReadChunk = 10; DeleteChunk = 11; @@ -95,7 +95,7 @@ enum Type { ListChunk = 13; CompactChunk = 14; - /** Combines Key and Chunk Operation into Single RPC. */ + /** Combines Block and Chunk Operation into Single RPC. */ PutSmallFile = 15; GetSmallFile = 16; CloseContainer = 17; @@ -115,7 +115,7 @@ enum Result { CONTAINER_NOT_FOUND = 9; IO_EXCEPTION = 10; UNABLE_TO_READ_METADATA_DB = 11; - NO_SUCH_KEY = 12; + NO_SUCH_BLOCK = 12; OVERWRITE_FLAG_REQUIRED = 13; UNABLE_TO_FIND_DATA_DIR = 14; INVALID_WRITE_SIZE = 15; @@ -185,10 +185,10 @@ message ContainerCommandRequestProto { optional ListContainerRequestProto listContainer = 9; optional CloseContainerRequestProto closeContainer = 10; - optional PutKeyRequestProto putKey = 11; - optional GetKeyRequestProto getKey = 12; - optional DeleteKeyRequestProto deleteKey = 13; - optional ListKeyRequestProto listKey = 14; + optional PutBlockRequestProto putBlock = 11; + optional GetBlockRequestProto getBlock = 12; + optional DeleteBlockRequestProto deleteBlock = 13; + optional ListBlockRequestProto listBlock = 14; optional ReadChunkRequestProto readChunk = 15; optional WriteChunkRequestProto writeChunk = 16; @@ -215,10 +215,10 @@ message ContainerCommandResponseProto { optional ListContainerResponseProto listContainer = 9; optional CloseContainerResponseProto closeContainer = 10; - optional PutKeyResponseProto putKey = 11; - optional GetKeyResponseProto getKey = 12; - optional DeleteKeyResponseProto deleteKey = 13; - optional ListKeyResponseProto listKey = 14; + optional PutBlockResponseProto putBlock = 11; + optional GetBlockResponseProto getBlock = 12; + optional DeleteBlockResponseProto deleteBlock = 13; + optional ListBlockResponseProto listBlock = 14; optional WriteChunkResponseProto writeChunk = 15; optional ReadChunkResponseProto readChunk = 16; @@ -294,7 +294,7 @@ message CloseContainerResponseProto { optional int64 containerID = 2; } -message KeyData { +message BlockData { required DatanodeBlockID blockID = 1; optional int64 flags = 2; // for future use. repeated KeyValue metadata = 3; @@ -302,25 +302,25 @@ message KeyData { optional int64 size = 5; } -// Key Messages. -message PutKeyRequestProto { - required KeyData keyData = 1; +// Block Messages. +message PutBlockRequestProto { + required BlockData blockData = 1; } -message PutKeyResponseProto { +message PutBlockResponseProto { required GetCommittedBlockLengthResponseProto committedBlockLength = 1; } -message GetKeyRequestProto { +message GetBlockRequestProto { required DatanodeBlockID blockID = 1; } -message GetKeyResponseProto { - required KeyData keyData = 1; +message GetBlockResponseProto { + required BlockData blockData = 1; } -message DeleteKeyRequestProto { +message DeleteBlockRequestProto { required DatanodeBlockID blockID = 1; } @@ -333,17 +333,17 @@ message GetCommittedBlockLengthResponseProto { required int64 blockLength = 2; } -message DeleteKeyResponseProto { +message DeleteBlockResponseProto { } -message ListKeyRequestProto { +message ListBlockRequestProto { optional int64 startLocalID = 2; required uint32 count = 3; } -message ListKeyResponseProto { - repeated KeyData keyData = 1; +message ListBlockResponseProto { + repeated BlockData blockData = 1; } // Chunk Operations @@ -401,11 +401,11 @@ message ListChunkResponseProto { repeated ChunkInfo chunkData = 1; } -/** For small file access combines write chunk and putKey into a single +/** For small file access combines write chunk and putBlock into a single RPC */ message PutSmallFileRequestProto { - required PutKeyRequestProto key = 1; + required PutBlockRequestProto block = 1; required ChunkInfo chunkInfo = 2; required bytes data = 3; } @@ -416,7 +416,7 @@ message PutSmallFileResponseProto { } message GetSmallFileRequestProto { - required GetKeyRequestProto key = 1; + required GetBlockRequestProto block = 1; } message GetSmallFileResponseProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/096a7160/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/OpenContainerBlockMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/OpenContainerBlockMap.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/OpenContainerBlockMap.java index 1ef3d0d..b736eb5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/OpenContainerBlockMap.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/OpenContainerBlockMap.java @@ -22,7 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; -import org.apache.hadoop.ozone.container.common.helpers.KeyData; +import org.apache.hadoop.ozone.container.common.helpers.BlockData; import java.util.ArrayList; import java.util.Collections; @@ -33,9 +33,9 @@ import java.util.concurrent.ConcurrentMap; import java.util.function.Function; /** - * Map: containerId -> (localId -> {@link KeyData}). + * Map: containerId {@literal ->} (localId {@literal ->} {@link BlockData}). * The outer container map does not entail locking for a better performance. - * The inner {@link KeyDataMap} is synchronized. + * The inner {@link BlockDataMap} is synchronized. * * This class will maintain list of open keys per container when closeContainer * command comes, it should autocommit all open keys of a open container before @@ -43,16 +43,16 @@ import java.util.function.Function; */ public class OpenContainerBlockMap { /** - * Map: localId -> KeyData. + * Map: localId {@literal ->} BlockData. * * In order to support {@link #getAll()}, the update operations are * synchronized. */ - static class KeyDataMap { - private final ConcurrentMap<Long, KeyData> blocks = + static class BlockDataMap { + private final ConcurrentMap<Long, BlockData> blocks = new ConcurrentHashMap<>(); - KeyData get(long localId) { + BlockData get(long localId) { return blocks.get(localId); } @@ -61,12 +61,12 @@ public class OpenContainerBlockMap { return blocks.size(); } - synchronized KeyData computeIfAbsent( - long localId, Function<Long, KeyData> f) { + synchronized BlockData computeIfAbsent( + long localId, Function<Long, BlockData> f) { return blocks.computeIfAbsent(localId, f); } - synchronized List<KeyData> getAll() { + synchronized List<BlockData> getAll() { return new ArrayList<>(blocks.values()); } } @@ -79,7 +79,7 @@ public class OpenContainerBlockMap { * * For now, we will track all open blocks of a container in the blockMap. */ - private final ConcurrentMap<Long, KeyDataMap> containers = + private final ConcurrentMap<Long, BlockDataMap> containers = new ConcurrentHashMap<>(); /** @@ -94,9 +94,9 @@ public class OpenContainerBlockMap { public void addChunk(BlockID blockID, ChunkInfo info) { Preconditions.checkNotNull(info); - containers.computeIfAbsent(blockID.getContainerID(), id -> new KeyDataMap()) - .computeIfAbsent(blockID.getLocalID(), id -> new KeyData(blockID)) - .addChunk(info); + containers.computeIfAbsent(blockID.getContainerID(), + id -> new BlockDataMap()).computeIfAbsent(blockID.getLocalID(), + id -> new BlockData(blockID)).addChunk(info); } /** @@ -113,21 +113,21 @@ public class OpenContainerBlockMap { } /** - * Returns the list of open to the openContainerBlockMap. + * Returns the list of open blocks to the openContainerBlockMap. * @param containerId container id - * @return List of open Keys(blocks) + * @return List of open blocks */ - public List<KeyData> getOpenKeys(long containerId) { + public List<BlockData> getOpenBlocks(long containerId) { return Optional.ofNullable(containers.get(containerId)) - .map(KeyDataMap::getAll) + .map(BlockDataMap::getAll) .orElseGet(Collections::emptyList); } /** * removes the block from the block map. - * @param blockID + * @param blockID - block ID */ - public void removeFromKeyMap(BlockID blockID) { + public void removeFromBlockMap(BlockID blockID) { Preconditions.checkNotNull(blockID); containers.computeIfPresent(blockID.getContainerID(), (containerId, blocks) -> blocks.removeAndGetSize(blockID.getLocalID()) == 0? null: blocks); @@ -136,16 +136,16 @@ public class OpenContainerBlockMap { /** * Returns true if the block exists in the map, false otherwise. * - * @param blockID + * @param blockID - Block ID. * @return True, if it exists, false otherwise */ public boolean checkIfBlockExists(BlockID blockID) { - KeyDataMap keyDataMap = containers.get(blockID.getContainerID()); + BlockDataMap keyDataMap = containers.get(blockID.getContainerID()); return keyDataMap != null && keyDataMap.get(blockID.getLocalID()) != null; } @VisibleForTesting - KeyDataMap getKeyDataMap(long containerId) { + BlockDataMap getBlockDataMap(long containerId) { return containers.get(containerId); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/096a7160/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java index b0d4cbc..430b0ef 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java @@ -36,7 +36,7 @@ import org.apache.hadoop.ozone.container.common.helpers .DeletedContainerBlocksSummary; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; -import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils; +import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.statemachine .EndpointStateMachine; @@ -199,7 +199,7 @@ public class DeleteBlocksCommandHandler implements CommandHandler { } int newDeletionBlocks = 0; - MetadataStore containerDB = KeyUtils.getDB(containerData, conf); + MetadataStore containerDB = BlockUtils.getDB(containerData, conf); for (Long blk : delTX.getLocalIDList()) { BatchOperation batch = new BatchOperation(); byte[] blkBytes = Longs.toByteArray(blk); http://git-wip-us.apache.org/repos/asf/hadoop/blob/096a7160/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index b84db66..a7bef86 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -103,10 +103,10 @@ import java.util.stream.Collectors; * implementation. For example, synchronization between writeChunk and * createContainer in {@link ContainerStateMachine}. * - * PutKey is synchronized with WriteChunk operations, PutKey for a block is - * executed only after all the WriteChunk preceding the PutKey have finished. + * PutBlock is synchronized with WriteChunk operations, PutBlock for a block is + * executed only after all the WriteChunk preceding the PutBlock have finished. * - * CloseContainer is synchronized with WriteChunk and PutKey operations, + * CloseContainer is synchronized with WriteChunk and PutBlock operations, * CloseContainer for a container is processed after all the preceding write * operations for the container have finished. * */ @@ -443,7 +443,7 @@ public class ContainerStateMachine extends BaseStateMachine { /** * This class maintains maps and provide utilities to enforce synchronization - * among createContainer, writeChunk, putKey and closeContainer. + * among createContainer, writeChunk, putBlock and closeContainer. */ private class StateMachineHelper { @@ -453,7 +453,7 @@ public class ContainerStateMachine extends BaseStateMachine { private final ConcurrentHashMap<Long, CommitChunkFutureMap> block2ChunkMap; - // Map for putKey futures + // Map for putBlock futures private final ConcurrentHashMap<Long, CompletableFuture<Message>> blockCommitMap; @@ -505,11 +505,11 @@ public class ContainerStateMachine extends BaseStateMachine { // The following section handles applyTransaction transactions // on a container - private CompletableFuture<Message> handlePutKey( + private CompletableFuture<Message> handlePutBlock( ContainerCommandRequestProto requestProto) { List<CompletableFuture<Message>> futureList = new ArrayList<>(); long localId = - requestProto.getPutKey().getKeyData().getBlockID().getLocalID(); + requestProto.getPutBlock().getBlockData().getBlockID().getLocalID(); // Need not wait for create container future here as it has already // finished. if (block2ChunkMap.get(localId) != null) { @@ -518,18 +518,18 @@ public class ContainerStateMachine extends BaseStateMachine { CompletableFuture<Message> effectiveFuture = runCommandAfterFutures(futureList, requestProto); - CompletableFuture<Message> putKeyFuture = + CompletableFuture<Message> putBlockFuture = effectiveFuture.thenApply(message -> { blockCommitMap.remove(localId); return message; }); - blockCommitMap.put(localId, putKeyFuture); - return putKeyFuture; + blockCommitMap.put(localId, putBlockFuture); + return putBlockFuture; } // Close Container should be executed only if all pending WriteType // container cmds get executed. Transactions which can return a future - // are WriteChunk and PutKey. + // are WriteChunk and PutBlock. private CompletableFuture<Message> handleCloseContainer( ContainerCommandRequestProto requestProto) { List<CompletableFuture<Message>> futureList = new ArrayList<>(); @@ -539,7 +539,7 @@ public class ContainerStateMachine extends BaseStateMachine { block2ChunkMap.values().forEach(b -> futureList.addAll(b.getAll())); futureList.addAll(blockCommitMap.values()); - // There are pending write Chunk/PutKey type requests + // There are pending write Chunk/PutBlock type requests // Queue this closeContainer request behind all these requests CompletableFuture<Message> closeContainerFuture = runCommandAfterFutures(futureList, requestProto); @@ -615,8 +615,8 @@ public class ContainerStateMachine extends BaseStateMachine { return handleChunkCommit(requestProto, index); case CloseContainer: return handleCloseContainer(requestProto); - case PutKey: - return handlePutKey(requestProto); + case PutBlock: + return handlePutBlock(requestProto); case CreateContainer: return handleCreateContainer(requestProto); default: http://git-wip-us.apache.org/repos/asf/hadoop/blob/096a7160/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueBlockIterator.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueBlockIterator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueBlockIterator.java index f800223..535af29 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueBlockIterator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueBlockIterator.java @@ -21,12 +21,12 @@ package org.apache.hadoop.ozone.container.keyvalue; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; -import org.apache.hadoop.ozone.container.common.helpers.KeyData; import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml; import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator; -import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils; +import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil; import org.apache.hadoop.utils.MetaStoreIterator; import org.apache.hadoop.utils.MetadataKeyFilters; @@ -48,7 +48,7 @@ import java.util.NoSuchElementException; * {@link MetadataKeyFilters#getNormalKeyFilter()} */ @InterfaceAudience.Public -public class KeyValueBlockIterator implements BlockIterator<KeyData> { +public class KeyValueBlockIterator implements BlockIterator<BlockData> { private static final Logger LOG = LoggerFactory.getLogger( KeyValueBlockIterator.class); @@ -57,7 +57,7 @@ public class KeyValueBlockIterator implements BlockIterator<KeyData> { private static KeyPrefixFilter defaultBlockFilter = MetadataKeyFilters .getNormalKeyFilter(); private KeyPrefixFilter blockFilter; - private KeyData nextBlock; + private BlockData nextBlock; private long containerId; /** @@ -91,7 +91,7 @@ public class KeyValueBlockIterator implements BlockIterator<KeyData> { containerData; keyValueContainerData.setDbFile(KeyValueContainerLocationUtil .getContainerDBFile(metdataPath, containerId)); - MetadataStore metadataStore = KeyUtils.getDB(keyValueContainerData, new + MetadataStore metadataStore = BlockUtils.getDB(keyValueContainerData, new OzoneConfiguration()); blockIterator = metadataStore.iterator(); blockFilter = filter; @@ -103,9 +103,9 @@ public class KeyValueBlockIterator implements BlockIterator<KeyData> { * @throws IOException */ @Override - public KeyData nextBlock() throws IOException, NoSuchElementException { + public BlockData nextBlock() throws IOException, NoSuchElementException { if (nextBlock != null) { - KeyData currentBlock = nextBlock; + BlockData currentBlock = nextBlock; nextBlock = null; return currentBlock; } @@ -124,7 +124,7 @@ public class KeyValueBlockIterator implements BlockIterator<KeyData> { if (blockIterator.hasNext()) { KeyValue block = blockIterator.next(); if (blockFilter.filterKey(null, block.getKey(), null)) { - nextBlock = KeyUtils.getKeyData(block.getValue()); + nextBlock = BlockUtils.getBlockData(block.getValue()); LOG.trace("Block matching with filter found: blockID is : {} for " + "containerID {}", nextBlock.getLocalID(), containerId); return true; http://git-wip-us.apache.org/repos/asf/hadoop/blob/096a7160/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java index 0870c76..09d4054 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java @@ -49,7 +49,7 @@ import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker; import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; -import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils; +import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; import org.apache.hadoop.ozone.container.keyvalue.helpers .KeyValueContainerLocationUtil; import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil; @@ -293,7 +293,7 @@ public class KeyValueContainer implements Container<KeyValueContainerData> { // It is ok if this operation takes a bit of time. // Close container is not expected to be instantaneous. try { - MetadataStore db = KeyUtils.getDB(containerData, config); + MetadataStore db = BlockUtils.getDB(containerData, config); db.compactDB(); } catch (StorageContainerException ex) { throw ex; http://git-wip-us.apache.org/repos/asf/hadoop/blob/096a7160/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 5acecb4..5be6e28 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -48,10 +48,10 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers .StorageContainerException; +import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; -import org.apache.hadoop.ozone.container.common.helpers.KeyData; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.impl.OpenContainerBlockMap; import org.apache.hadoop.ozone.container.common.interfaces.Container; @@ -62,13 +62,13 @@ import org.apache.hadoop.ozone.container.common.volume .RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils; -import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils; +import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil; import org.apache.hadoop.ozone.container.keyvalue.helpers.SmallFileUtils; import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerImpl; -import org.apache.hadoop.ozone.container.keyvalue.impl.KeyManagerImpl; +import org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl; import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager; -import org.apache.hadoop.ozone.container.keyvalue.interfaces.KeyManager; +import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager; import org.apache.hadoop.ozone.container.keyvalue.statemachine.background .BlockDeletingService; import org.apache.hadoop.util.AutoCloseableLock; @@ -117,7 +117,7 @@ public class KeyValueHandler extends Handler { KeyValueHandler.class); private final ContainerType containerType; - private final KeyManager keyManager; + private final BlockManager blockManager; private final ChunkManager chunkManager; private final BlockDeletingService blockDeletingService; private final VolumeChoosingPolicy volumeChoosingPolicy; @@ -129,7 +129,7 @@ public class KeyValueHandler extends Handler { VolumeSet volSet, ContainerMetrics metrics) { super(config, contSet, volSet, metrics); containerType = ContainerType.KeyValueContainer; - keyManager = new KeyManagerImpl(config); + blockManager = new BlockManagerImpl(config); chunkManager = new ChunkManagerImpl(); long svcInterval = config .getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, @@ -187,13 +187,13 @@ public class KeyValueHandler extends Handler { return handleUnsupportedOp(request); case CloseContainer: return handleCloseContainer(request, kvContainer); - case PutKey: - return handlePutKey(request, kvContainer); - case GetKey: - return handleGetKey(request, kvContainer); - case DeleteKey: - return handleDeleteKey(request, kvContainer); - case ListKey: + case PutBlock: + return handlePutBlock(request, kvContainer); + case GetBlock: + return handleGetBlock(request, kvContainer); + case DeleteBlock: + return handleDeleteBlock(request, kvContainer); + case ListBlock: return handleUnsupportedOp(request); case ReadChunk: return handleReadChunk(request, kvContainer); @@ -222,8 +222,8 @@ public class KeyValueHandler extends Handler { } @VisibleForTesting - public KeyManager getKeyManager() { - return this.keyManager; + public BlockManager getBlockManager() { + return this.blockManager; } /** @@ -413,7 +413,7 @@ public class KeyValueHandler extends Handler { // remove the container from open block map once, all the blocks // have been committed and the container is closed kvData.setState(ContainerProtos.ContainerLifeCycleState.CLOSING); - commitPendingKeys(kvContainer); + commitPendingBlocks(kvContainer); kvContainer.close(); // make sure the the container open keys from BlockMap gets removed openContainerBlockMap.removeContainer(kvData.getContainerID()); @@ -429,13 +429,13 @@ public class KeyValueHandler extends Handler { } /** - * Handle Put Key operation. Calls KeyManager to process the request. + * Handle Put Block operation. Calls BlockManager to process the request. */ - ContainerCommandResponseProto handlePutKey( + ContainerCommandResponseProto handlePutBlock( ContainerCommandRequestProto request, KeyValueContainer kvContainer) { long blockLength; - if (!request.hasPutKey()) { + if (!request.hasPutBlock()) { LOG.debug("Malformed Put Key request. trace ID: {}", request.getTraceID()); return ContainerUtils.malformedRequest(request); @@ -444,11 +444,11 @@ public class KeyValueHandler extends Handler { try { checkContainerOpen(kvContainer); - KeyData keyData = KeyData.getFromProtoBuf( - request.getPutKey().getKeyData()); - long numBytes = keyData.getProtoBufMessage().toByteArray().length; - blockLength = commitKey(keyData, kvContainer); - metrics.incContainerBytesStats(Type.PutKey, numBytes); + BlockData blockData = BlockData.getFromProtoBuf( + request.getPutBlock().getBlockData()); + long numBytes = blockData.getProtoBufMessage().toByteArray().length; + blockLength = commitKey(blockData, kvContainer); + metrics.incContainerBytesStats(Type.PutBlock, numBytes); } catch (StorageContainerException ex) { return ContainerUtils.logAndReturnError(LOG, ex, request); } catch (IOException ex) { @@ -457,46 +457,46 @@ public class KeyValueHandler extends Handler { request); } - return KeyUtils.putKeyResponseSuccess(request, blockLength); + return BlockUtils.putBlockResponseSuccess(request, blockLength); } - private void commitPendingKeys(KeyValueContainer kvContainer) + private void commitPendingBlocks(KeyValueContainer kvContainer) throws IOException { long containerId = kvContainer.getContainerData().getContainerID(); - List<KeyData> pendingKeys = - this.openContainerBlockMap.getOpenKeys(containerId); - for(KeyData keyData : pendingKeys) { - commitKey(keyData, kvContainer); + List<BlockData> pendingBlocks = + this.openContainerBlockMap.getOpenBlocks(containerId); + for(BlockData blockData : pendingBlocks) { + commitKey(blockData, kvContainer); } } - private long commitKey(KeyData keyData, KeyValueContainer kvContainer) + private long commitKey(BlockData blockData, KeyValueContainer kvContainer) throws IOException { - Preconditions.checkNotNull(keyData); - long length = keyManager.putKey(kvContainer, keyData); + Preconditions.checkNotNull(blockData); + long length = blockManager.putBlock(kvContainer, blockData); //update the open key Map in containerManager - this.openContainerBlockMap.removeFromKeyMap(keyData.getBlockID()); + this.openContainerBlockMap.removeFromBlockMap(blockData.getBlockID()); return length; } /** - * Handle Get Key operation. Calls KeyManager to process the request. + * Handle Get Block operation. Calls BlockManager to process the request. */ - ContainerCommandResponseProto handleGetKey( + ContainerCommandResponseProto handleGetBlock( ContainerCommandRequestProto request, KeyValueContainer kvContainer) { - if (!request.hasGetKey()) { + if (!request.hasGetBlock()) { LOG.debug("Malformed Get Key request. trace ID: {}", request.getTraceID()); return ContainerUtils.malformedRequest(request); } - KeyData responseData; + BlockData responseData; try { BlockID blockID = BlockID.getFromProtobuf( - request.getGetKey().getBlockID()); - responseData = keyManager.getKey(kvContainer, blockID); + request.getGetBlock().getBlockID()); + responseData = blockManager.getBlock(kvContainer, blockID); long numBytes = responseData.getProtoBufMessage().toByteArray().length; - metrics.incContainerBytesStats(Type.GetKey, numBytes); + metrics.incContainerBytesStats(Type.GetBlock, numBytes); } catch (StorageContainerException ex) { return ContainerUtils.logAndReturnError(LOG, ex, request); @@ -506,12 +506,12 @@ public class KeyValueHandler extends Handler { request); } - return KeyUtils.getKeyDataResponse(request, responseData); + return BlockUtils.getBlockDataResponse(request, responseData); } /** * Handles GetCommittedBlockLength operation. - * Calls KeyManager to process the request. + * Calls BlockManager to process the request. */ ContainerCommandResponseProto handleGetCommittedBlockLength( ContainerCommandRequestProto request, KeyValueContainer kvContainer) { @@ -530,7 +530,7 @@ public class KeyValueHandler extends Handler { String msg = "Block " + blockID + " is not committed yet."; throw new StorageContainerException(msg, BLOCK_NOT_COMMITTED); } - blockLength = keyManager.getCommittedBlockLength(kvContainer, blockID); + blockLength = blockManager.getCommittedBlockLength(kvContainer, blockID); } catch (StorageContainerException ex) { return ContainerUtils.logAndReturnError(LOG, ex, request); } catch (IOException ex) { @@ -539,16 +539,16 @@ public class KeyValueHandler extends Handler { IO_EXCEPTION), request); } - return KeyUtils.getBlockLengthResponse(request, blockLength); + return BlockUtils.getBlockLengthResponse(request, blockLength); } /** - * Handle Delete Key operation. Calls KeyManager to process the request. + * Handle Delete Block operation. Calls BlockManager to process the request. */ - ContainerCommandResponseProto handleDeleteKey( + ContainerCommandResponseProto handleDeleteBlock( ContainerCommandRequestProto request, KeyValueContainer kvContainer) { - if (!request.hasDeleteKey()) { + if (!request.hasDeleteBlock()) { LOG.debug("Malformed Delete Key request. trace ID: {}", request.getTraceID()); return ContainerUtils.malformedRequest(request); @@ -558,9 +558,9 @@ public class KeyValueHandler extends Handler { checkContainerOpen(kvContainer); BlockID blockID = BlockID.getFromProtobuf( - request.getDeleteKey().getBlockID()); + request.getDeleteBlock().getBlockID()); - keyManager.deleteKey(kvContainer, blockID); + blockManager.deleteBlock(kvContainer, blockID); } catch (StorageContainerException ex) { return ContainerUtils.logAndReturnError(LOG, ex, request); } catch (IOException ex) { @@ -569,7 +569,7 @@ public class KeyValueHandler extends Handler { request); } - return KeyUtils.getKeyResponseSuccess(request); + return BlockUtils.getBlockResponseSuccess(request); } /** @@ -698,7 +698,7 @@ public class KeyValueHandler extends Handler { /** * Handle Put Small File operation. Writes the chunk and associated key - * using a single RPC. Calls KeyManager and ChunkManager to process the + * using a single RPC. Calls BlockManager and ChunkManager to process the * request. */ ContainerCommandResponseProto handlePutSmallFile( @@ -715,11 +715,11 @@ public class KeyValueHandler extends Handler { try { checkContainerOpen(kvContainer); - BlockID blockID = BlockID.getFromProtobuf(putSmallFileReq.getKey() - .getKeyData().getBlockID()); - KeyData keyData = KeyData.getFromProtoBuf( - putSmallFileReq.getKey().getKeyData()); - Preconditions.checkNotNull(keyData); + BlockID blockID = BlockID.getFromProtobuf(putSmallFileReq.getBlock() + .getBlockData().getBlockID()); + BlockData blockData = BlockData.getFromProtoBuf( + putSmallFileReq.getBlock().getBlockData()); + Preconditions.checkNotNull(blockData); ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf( putSmallFileReq.getChunkInfo()); @@ -732,8 +732,8 @@ public class KeyValueHandler extends Handler { List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>(); chunks.add(chunkInfo.getProtoBufMessage()); - keyData.setChunks(chunks); - keyManager.putKey(kvContainer, keyData); + blockData.setChunks(chunks); + blockManager.putBlock(kvContainer, blockData); metrics.incContainerBytesStats(Type.PutSmallFile, data.length); } catch (StorageContainerException ex) { @@ -749,7 +749,7 @@ public class KeyValueHandler extends Handler { /** * Handle Get Small File operation. Gets a data stream using a key. This - * helps in reducing the RPC overhead for small files. Calls KeyManager and + * helps in reducing the RPC overhead for small files. Calls BlockManager and * ChunkManager to process the request. */ ContainerCommandResponseProto handleGetSmallFile( @@ -764,9 +764,9 @@ public class KeyValueHandler extends Handler { GetSmallFileRequestProto getSmallFileReq = request.getGetSmallFile(); try { - BlockID blockID = BlockID.getFromProtobuf(getSmallFileReq.getKey() + BlockID blockID = BlockID.getFromProtobuf(getSmallFileReq.getBlock() .getBlockID()); - KeyData responseData = keyManager.getKey(kvContainer, blockID); + BlockData responseData = blockManager.getBlock(kvContainer, blockID); ContainerProtos.ChunkInfo chunkInfo = null; ByteString dataBuf = ByteString.EMPTY; http://git-wip-us.apache.org/repos/asf/hadoop/blob/096a7160/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java new file mode 100644 index 0000000..f5cc847 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.keyvalue.helpers; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .ContainerCommandRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .ContainerCommandResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .GetBlockResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos. + GetCommittedBlockLengthResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos. + PutBlockResponseProto; +import org.apache.hadoop.hdds.scm.container.common.helpers + .StorageContainerException; +import org.apache.hadoop.ozone.container.common.helpers.BlockData; +import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; +import org.apache.hadoop.ozone.container.common.utils.ContainerCache; +import org.apache.hadoop.utils.MetadataStore; + +import java.io.IOException; + +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .Result.NO_SUCH_BLOCK; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .Result.UNABLE_TO_READ_METADATA_DB; + +/** + * Utils functions to help block functions. + */ +public final class BlockUtils { + + /** Never constructed. **/ + private BlockUtils() { + + } + /** + * Get a DB handler for a given container. + * If the handler doesn't exist in cache yet, first create one and + * add into cache. This function is called with containerManager + * ReadLock held. + * + * @param containerData containerData. + * @param conf configuration. + * @return MetadataStore handle. + * @throws StorageContainerException + */ + public static MetadataStore getDB(KeyValueContainerData containerData, + Configuration conf) throws + StorageContainerException { + Preconditions.checkNotNull(containerData); + ContainerCache cache = ContainerCache.getInstance(conf); + Preconditions.checkNotNull(cache); + Preconditions.checkNotNull(containerData.getDbFile()); + try { + return cache.getDB(containerData.getContainerID(), containerData + .getContainerDBType(), containerData.getDbFile().getAbsolutePath()); + } catch (IOException ex) { + String message = String.format("Error opening DB. Container:%s " + + "ContainerPath:%s", containerData.getContainerID(), containerData + .getDbFile().getPath()); + throw new StorageContainerException(message, UNABLE_TO_READ_METADATA_DB); + } + } + /** + * Remove a DB handler from cache. + * + * @param container - Container data. + * @param conf - Configuration. + */ + public static void removeDB(KeyValueContainerData container, Configuration + conf) { + Preconditions.checkNotNull(container); + ContainerCache cache = ContainerCache.getInstance(conf); + Preconditions.checkNotNull(cache); + cache.removeDB(container.getContainerID()); + } + + /** + * Shutdown all DB Handles. + * + * @param cache - Cache for DB Handles. + */ + @SuppressWarnings("unchecked") + public static void shutdownCache(ContainerCache cache) { + cache.shutdownCache(); + } + + /** + * Parses the {@link BlockData} from a bytes array. + * + * @param bytes Block data in bytes. + * @return Block data. + * @throws IOException if the bytes array is malformed or invalid. + */ + public static BlockData getBlockData(byte[] bytes) throws IOException { + try { + ContainerProtos.BlockData blockData = ContainerProtos.BlockData.parseFrom( + bytes); + BlockData data = BlockData.getFromProtoBuf(blockData); + return data; + } catch (IOException e) { + throw new StorageContainerException("Failed to parse block data from " + + "the bytes array.", NO_SUCH_BLOCK); + } + } + + /** + * Returns putBlock response success. + * @param msg - Request. + * @return Response. + */ + public static ContainerCommandResponseProto putBlockResponseSuccess( + ContainerCommandRequestProto msg, long blockLength) { + GetCommittedBlockLengthResponseProto.Builder + committedBlockLengthResponseBuilder = + getCommittedBlockLengthResponseBuilder(blockLength, + msg.getPutBlock().getBlockData().getBlockID()); + PutBlockResponseProto.Builder putKeyResponse = + PutBlockResponseProto.newBuilder(); + putKeyResponse + .setCommittedBlockLength(committedBlockLengthResponseBuilder); + ContainerProtos.ContainerCommandResponseProto.Builder builder = + ContainerUtils.getSuccessResponseBuilder(msg); + builder.setPutBlock(putKeyResponse); + return builder.build(); + } + /** + * Returns successful blockResponse. + * @param msg - Request. + * @return Response. + */ + public static ContainerCommandResponseProto getBlockResponseSuccess( + ContainerCommandRequestProto msg) { + return ContainerUtils.getSuccessResponse(msg); + } + + + public static ContainerCommandResponseProto getBlockDataResponse( + ContainerCommandRequestProto msg, BlockData data) { + GetBlockResponseProto.Builder getBlock = ContainerProtos + .GetBlockResponseProto + .newBuilder(); + getBlock.setBlockData(data.getProtoBufMessage()); + ContainerProtos.ContainerCommandResponseProto.Builder builder = + ContainerUtils.getSuccessResponseBuilder(msg); + builder.setGetBlock(getBlock); + return builder.build(); + } + + /** + * Returns successful getCommittedBlockLength Response. + * @param msg - Request. + * @return Response. + */ + public static ContainerCommandResponseProto getBlockLengthResponse( + ContainerCommandRequestProto msg, long blockLength) { + GetCommittedBlockLengthResponseProto.Builder + committedBlockLengthResponseBuilder = + getCommittedBlockLengthResponseBuilder(blockLength, + msg.getGetCommittedBlockLength().getBlockID()); + ContainerProtos.ContainerCommandResponseProto.Builder builder = + ContainerUtils.getSuccessResponseBuilder(msg); + builder.setGetCommittedBlockLength(committedBlockLengthResponseBuilder); + return builder.build(); + } + + private static GetCommittedBlockLengthResponseProto.Builder + getCommittedBlockLengthResponseBuilder(long blockLength, + ContainerProtos.DatanodeBlockID blockID) { + ContainerProtos.GetCommittedBlockLengthResponseProto.Builder + getCommittedBlockLengthResponseBuilder = ContainerProtos. + GetCommittedBlockLengthResponseProto.newBuilder(); + getCommittedBlockLengthResponseBuilder.setBlockLength(blockLength); + getCommittedBlockLengthResponseBuilder.setBlockID(blockID); + return getCommittedBlockLengthResponseBuilder; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/096a7160/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java deleted file mode 100644 index a83d298..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java +++ /dev/null @@ -1,199 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.keyvalue.helpers; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerCommandRequestProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerCommandResponseProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .GetKeyResponseProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos. - GetCommittedBlockLengthResponseProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos. - PutKeyResponseProto; -import org.apache.hadoop.hdds.scm.container.common.helpers - .StorageContainerException; -import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; -import org.apache.hadoop.ozone.container.common.helpers.KeyData; -import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; -import org.apache.hadoop.ozone.container.common.utils.ContainerCache; -import org.apache.hadoop.utils.MetadataStore; - -import java.io.IOException; - -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.NO_SUCH_KEY; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.UNABLE_TO_READ_METADATA_DB; - -/** - * Utils functions to help key functions. - */ -public final class KeyUtils { - - /** Never constructed. **/ - private KeyUtils() { - - } - /** - * Get a DB handler for a given container. - * If the handler doesn't exist in cache yet, first create one and - * add into cache. This function is called with containerManager - * ReadLock held. - * - * @param containerData containerData. - * @param conf configuration. - * @return MetadataStore handle. - * @throws StorageContainerException - */ - public static MetadataStore getDB(KeyValueContainerData containerData, - Configuration conf) throws - StorageContainerException { - Preconditions.checkNotNull(containerData); - ContainerCache cache = ContainerCache.getInstance(conf); - Preconditions.checkNotNull(cache); - Preconditions.checkNotNull(containerData.getDbFile()); - try { - return cache.getDB(containerData.getContainerID(), containerData - .getContainerDBType(), containerData.getDbFile().getAbsolutePath()); - } catch (IOException ex) { - String message = String.format("Error opening DB. Container:%s " + - "ContainerPath:%s", containerData.getContainerID(), containerData - .getDbFile().getPath()); - throw new StorageContainerException(message, UNABLE_TO_READ_METADATA_DB); - } - } - /** - * Remove a DB handler from cache. - * - * @param container - Container data. - * @param conf - Configuration. - */ - public static void removeDB(KeyValueContainerData container, Configuration - conf) { - Preconditions.checkNotNull(container); - ContainerCache cache = ContainerCache.getInstance(conf); - Preconditions.checkNotNull(cache); - cache.removeDB(container.getContainerID()); - } - - /** - * Shutdown all DB Handles. - * - * @param cache - Cache for DB Handles. - */ - @SuppressWarnings("unchecked") - public static void shutdownCache(ContainerCache cache) { - cache.shutdownCache(); - } - - /** - * Parses the {@link KeyData} from a bytes array. - * - * @param bytes key data in bytes. - * @return key data. - * @throws IOException if the bytes array is malformed or invalid. - */ - public static KeyData getKeyData(byte[] bytes) throws IOException { - try { - ContainerProtos.KeyData keyData = ContainerProtos.KeyData.parseFrom( - bytes); - KeyData data = KeyData.getFromProtoBuf(keyData); - return data; - } catch (IOException e) { - throw new StorageContainerException("Failed to parse key data from the" + - " bytes array.", NO_SUCH_KEY); - } - } - - /** - * Returns putKey response success. - * @param msg - Request. - * @return Response. - */ - public static ContainerCommandResponseProto putKeyResponseSuccess( - ContainerCommandRequestProto msg, long blockLength) { - GetCommittedBlockLengthResponseProto.Builder - committedBlockLengthResponseBuilder = - getCommittedBlockLengthResponseBuilder(blockLength, - msg.getPutKey().getKeyData().getBlockID()); - PutKeyResponseProto.Builder putKeyResponse = - PutKeyResponseProto.newBuilder(); - putKeyResponse - .setCommittedBlockLength(committedBlockLengthResponseBuilder); - ContainerProtos.ContainerCommandResponseProto.Builder builder = - ContainerUtils.getSuccessResponseBuilder(msg); - builder.setPutKey(putKeyResponse); - return builder.build(); - } - /** - * Returns successful keyResponse. - * @param msg - Request. - * @return Response. - */ - public static ContainerCommandResponseProto getKeyResponseSuccess( - ContainerCommandRequestProto msg) { - return ContainerUtils.getSuccessResponse(msg); - } - - - public static ContainerCommandResponseProto getKeyDataResponse( - ContainerCommandRequestProto msg, KeyData data) { - GetKeyResponseProto.Builder getKey = ContainerProtos - .GetKeyResponseProto - .newBuilder(); - getKey.setKeyData(data.getProtoBufMessage()); - ContainerProtos.ContainerCommandResponseProto.Builder builder = - ContainerUtils.getSuccessResponseBuilder(msg); - builder.setGetKey(getKey); - return builder.build(); - } - - /** - * Returns successful getCommittedBlockLength Response. - * @param msg - Request. - * @return Response. - */ - public static ContainerCommandResponseProto getBlockLengthResponse( - ContainerCommandRequestProto msg, long blockLength) { - GetCommittedBlockLengthResponseProto.Builder - committedBlockLengthResponseBuilder = - getCommittedBlockLengthResponseBuilder(blockLength, - msg.getGetCommittedBlockLength().getBlockID()); - ContainerProtos.ContainerCommandResponseProto.Builder builder = - ContainerUtils.getSuccessResponseBuilder(msg); - builder.setGetCommittedBlockLength(committedBlockLengthResponseBuilder); - return builder.build(); - } - - private static GetCommittedBlockLengthResponseProto.Builder - getCommittedBlockLengthResponseBuilder( - long blockLength, ContainerProtos.DatanodeBlockID blockID) { - ContainerProtos.GetCommittedBlockLengthResponseProto.Builder - getCommittedBlockLengthResponseBuilder = ContainerProtos. - GetCommittedBlockLengthResponseProto.newBuilder(); - getCommittedBlockLengthResponseBuilder.setBlockLength(blockLength); - getCommittedBlockLengthResponseBuilder.setBlockID(blockID); - return getCommittedBlockLengthResponseBuilder; - } -} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
