http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java index 4974268..70a0e8a 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -23,7 +23,6 @@ import com.google.protobuf.ServiceException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -83,11 +82,10 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB public ContainerResponseProto allocateContainer(RpcController unused, ContainerRequestProto request) throws ServiceException { try { - Pipeline pipeline = impl.allocateContainer(request.getReplicationType(), - request.getReplicationFactor(), request.getContainerName(), - request.getOwner()); + ContainerInfo container = impl.allocateContainer(request.getReplicationType(), + request.getReplicationFactor(), request.getOwner()); return ContainerResponseProto.newBuilder() - .setPipeline(pipeline.getProtobufMessage()) + .setContainerInfo(container.getProtobuf()) .setErrorCode(ContainerResponseProto.Error.success) .build(); @@ -101,9 +99,9 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB RpcController controller, GetContainerRequestProto request) throws ServiceException { try { - Pipeline pipeline = impl.getContainer(request.getContainerName()); + ContainerInfo container = impl.getContainer(request.getContainerID()); return GetContainerResponseProto.newBuilder() - .setPipeline(pipeline.getProtobufMessage()) + .setContainerInfo(container.getProtobuf()) .build(); } catch (IOException e) { throw new ServiceException(e); @@ -114,23 +112,17 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB public SCMListContainerResponseProto listContainer(RpcController controller, SCMListContainerRequestProto request) throws ServiceException { try { - String startName = null; - String prefixName = null; + long startContainerID = 0; int count = -1; // Arguments check. - if (request.hasPrefixName()) { + if (request.hasStartContainerID()) { // End container name is given. - prefixName = request.getPrefixName(); + startContainerID = request.getStartContainerID(); } - if (request.hasStartName()) { - // End container name is given. - startName = request.getStartName(); - } - count = request.getCount(); List<ContainerInfo> containerList = - impl.listContainer(startName, prefixName, count); + impl.listContainer(startContainerID, count); SCMListContainerResponseProto.Builder builder = SCMListContainerResponseProto.newBuilder(); for (ContainerInfo container : containerList) { @@ -147,7 +139,7 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB RpcController controller, SCMDeleteContainerRequestProto request) throws ServiceException { try { - impl.deleteContainer(request.getContainerName()); + impl.deleteContainer(request.getContainerID()); return SCMDeleteContainerResponseProto.newBuilder().build(); } catch (IOException e) { throw new ServiceException(e); @@ -178,7 +170,7 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB RpcController controller, ObjectStageChangeRequestProto request) throws ServiceException { try { - impl.notifyObjectStageChange(request.getType(), request.getName(), + impl.notifyObjectStageChange(request.getType(), request.getId(), request.getOp(), request.getStage()); return ObjectStageChangeResponseProto.newBuilder().build(); } catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java index 83ca83d..13b9180 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java @@ -76,7 +76,9 @@ public class LevelDBStore implements MetadataStore { } private void openDB(File dbPath, Options options) throws IOException { - dbPath.getParentFile().mkdirs(); + if (dbPath.getParentFile().mkdirs()) { + LOG.debug("Db path {} created.", dbPath.getParentFile()); + } db = JniDBFactory.factory.open(dbPath, options); if (LOG.isDebugEnabled()) { LOG.debug("LevelDB successfully opened"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java index 3ff0a94..d3a2943 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java @@ -17,8 +17,8 @@ */ package org.apache.hadoop.utils; +import com.google.common.base.Preconditions; import com.google.common.base.Strings; -import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.ozone.OzoneConsts; /** @@ -94,8 +94,8 @@ public final class MetadataKeyFilters { if (Strings.isNullOrEmpty(keyPrefix)) { accept = true; } else { - if (currentKey != null && - DFSUtil.bytes2String(currentKey).startsWith(keyPrefix)) { + byte [] prefixBytes = keyPrefix.getBytes(); + if (currentKey != null && prefixMatch(prefixBytes, currentKey)) { keysHinted++; accept = true; } else { @@ -114,5 +114,19 @@ public final class MetadataKeyFilters { public int getKeysHintedNum() { return keysHinted; } + + private boolean prefixMatch(byte[] prefix, byte[] key) { + Preconditions.checkNotNull(prefix); + Preconditions.checkNotNull(key); + if (key.length < prefix.length) { + return false; + } + for (int i = 0; i < prefix.length; i++) { + if (key[i] != prefix[i]) { + return false; + } + } + return true; + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java index a60e98d..0dfca20 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java @@ -367,6 +367,7 @@ public class RocksDBStore implements MetadataStore { public void close() throws IOException { if (statMBeanName != null) { MBeans.unregister(statMBeanName); + statMBeanName = null; } if (db != null) { db.close(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto index a6270ef..e7494ee 100644 --- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto @@ -197,17 +197,15 @@ message ContainerCommandResponseProto { } message ContainerData { - required string name = 1; + required int64 containerID = 1; repeated KeyValue metadata = 2; optional string dbPath = 3; optional string containerPath = 4; - optional string hash = 6; - optional int64 bytesUsed = 7; - optional int64 size = 8; - optional int64 keyCount = 9; - //TODO: change required after we switch container ID from string to long - optional int64 containerID = 10; - optional LifeCycleState state = 11 [default = OPEN]; + optional string hash = 5; + optional int64 bytesUsed = 6; + optional int64 size = 7; + optional int64 keyCount = 8; + optional LifeCycleState state = 9 [default = OPEN]; } message ContainerMeta { @@ -226,7 +224,7 @@ message CreateContainerResponseProto { message ReadContainerRequestProto { required Pipeline pipeline = 1; - required string name = 2; + required int64 containerID = 2; } message ReadContainerResponseProto { @@ -243,19 +241,16 @@ message UpdateContainerResponseProto { } message DeleteContainerRequestProto { - required Pipeline pipeline = 1; - required string name = 2; - optional bool forceDelete = 3 [default = false]; + required int64 containerID = 1; + optional bool forceDelete = 2 [default = false]; } message DeleteContainerResponseProto { } message ListContainerRequestProto { - required Pipeline pipeline = 1; - optional string prefix = 2; - required uint32 count = 3; // Max Results to return - optional string prevKey = 4; // if this is not set query from start. + required int64 startContainerID = 1; + optional uint32 count = 2; // Max Results to return } message ListContainerResponseProto { @@ -263,34 +258,31 @@ message ListContainerResponseProto { } message CloseContainerRequestProto { - required Pipeline pipeline = 1; + required int64 containerID = 1; } message CloseContainerResponseProto { - optional Pipeline pipeline = 1; optional string hash = 2; + optional int64 containerID = 3; } message KeyData { - required string containerName = 1; - required string name = 2; - optional int64 flags = 3; // for future use. - repeated KeyValue metadata = 4; - repeated ChunkInfo chunks = 5; + required BlockID blockID = 1; + optional int64 flags = 2; // for future use. + repeated KeyValue metadata = 3; + repeated ChunkInfo chunks = 4; } // Key Messages. message PutKeyRequestProto { - required Pipeline pipeline = 1; - required KeyData keyData = 2; + required KeyData keyData = 1; } message PutKeyResponseProto { } message GetKeyRequestProto { - required Pipeline pipeline = 1; - required KeyData keyData = 2; + required KeyData keyData = 1; } message GetKeyResponseProto { @@ -299,17 +291,15 @@ message GetKeyResponseProto { message DeleteKeyRequestProto { - required Pipeline pipeline = 1; - required string name = 2; + required BlockID blockID = 1; } message DeleteKeyResponseProto { } message ListKeyRequestProto { - required Pipeline pipeline = 1; - optional string prefix = 2; // if specified returns keys that match prefix. - required string prevKey = 3; + required int64 containerID = 1; + optional int64 startLocalID = 2; required uint32 count = 4; } @@ -335,31 +325,28 @@ enum Stage { } message WriteChunkRequestProto { - required Pipeline pipeline = 1; - required string keyName = 2; - required ChunkInfo chunkData = 3; - optional bytes data = 4; - optional Stage stage = 5 [default = COMBINED]; + required BlockID blockID = 1; + required ChunkInfo chunkData = 2; + optional bytes data = 3; + optional Stage stage = 4 [default = COMBINED]; } message WriteChunkResponseProto { } message ReadChunkRequestProto { - required Pipeline pipeline = 1; - required string keyName = 2; - required ChunkInfo chunkData = 3; + required BlockID blockID = 1; + required ChunkInfo chunkData = 2; } message ReadChunkResponseProto { - required Pipeline pipeline = 1; + required BlockID blockID = 1; required ChunkInfo chunkData = 2; required bytes data = 3; } message DeleteChunkRequestProto { - required Pipeline pipeline = 1; - required string keyName = 2; + required BlockID blockID = 1; required ChunkInfo chunkData = 3; } @@ -367,10 +354,9 @@ message DeleteChunkResponseProto { } message ListChunkRequestProto { - required Pipeline pipeline = 1; - required string keyName = 2; - required string prevChunkName = 3; - required uint32 count = 4; + required BlockID blockID = 1; + required string prevChunkName = 2; + required uint32 count = 3; } message ListChunkResponseProto { @@ -400,7 +386,7 @@ message GetSmallFileResponseProto { } message CopyContainerRequestProto { - required string containerName = 1; + required int64 containerID = 1; required uint64 readOffset = 2; optional uint64 len = 3; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto b/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto index 38d2e16..7bea82a 100644 --- a/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto @@ -33,28 +33,6 @@ import "hdds.proto"; // SCM Block protocol -/** - * keys - batch of block keys to find - */ -message GetScmBlockLocationsRequestProto { - repeated string keys = 1; -} - -/** - * locatedBlocks - for each requested hash, nodes that currently host the - * container for that object key hash - */ -message GetScmBlockLocationsResponseProto { - repeated ScmLocatedBlockProto locatedBlocks = 1; -} - -/** - * Holds the nodes that currently host the blocks for a key. - */ -message ScmLocatedBlockProto { - required string key = 1; - required hadoop.hdds.Pipeline pipeline = 2; -} /** * Request send to SCM asking allocate block of specified size. @@ -84,7 +62,7 @@ message DeleteScmKeyBlocksRequestProto { */ message KeyBlocks { required string key = 1; - repeated string blocks = 2; + repeated BlockID blocks = 2; } /** @@ -112,7 +90,7 @@ message DeleteScmBlockResult { unknownFailure = 4; } required Result result = 1; - required string key = 2; + required BlockID blockID = 2; } /** @@ -126,7 +104,7 @@ message AllocateScmBlockResponseProto { unknownFailure = 4; } required Error errorCode = 1; - required string key = 2; + required BlockID blockID = 2; required hadoop.hdds.Pipeline pipeline = 3; required bool createContainer = 4; optional string errorMessage = 5; @@ -139,14 +117,6 @@ message AllocateScmBlockResponseProto { service ScmBlockLocationProtocolService { /** - * Find the set of nodes that currently host the block, as - * identified by the key. This method supports batch lookup by - * passing multiple keys. - */ - rpc getScmBlockLocations(GetScmBlockLocationsRequestProto) - returns (GetScmBlockLocationsResponseProto); - - /** * Creates a block entry in SCM. */ rpc allocateScmBlock(AllocateScmBlockRequestProto) http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto index d7540a3..090e6eb 100644 --- a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto @@ -35,7 +35,6 @@ import "hdds.proto"; * Request send to SCM asking where the container should be created. */ message ContainerRequestProto { - required string containerName = 1; // Ozone only support replciation of either 1 or 3. required ReplicationFactor replicationFactor = 2; required ReplicationType replicationType = 3; @@ -53,30 +52,29 @@ message ContainerResponseProto { errorContainerMissing = 3; } required Error errorCode = 1; - required Pipeline pipeline = 2; + required SCMContainerInfo containerInfo = 2; optional string errorMessage = 3; } message GetContainerRequestProto { - required string containerName = 1; + required int64 containerID = 1; } message GetContainerResponseProto { - required Pipeline pipeline = 1; + required SCMContainerInfo containerInfo = 1; } message SCMListContainerRequestProto { required uint32 count = 1; - optional string startName = 2; - optional string prefixName = 3; -} + optional uint64 startContainerID = 2; + } message SCMListContainerResponseProto { repeated SCMContainerInfo containers = 1; } message SCMDeleteContainerRequestProto { - required string containerName = 1; + required int64 containerID = 1; } message SCMDeleteContainerResponseProto { @@ -97,7 +95,7 @@ message ObjectStageChangeRequestProto { begin = 1; complete = 2; } - required string name = 1; + required int64 id = 1; required Type type = 2; required Op op= 3; required Stage stage = 4; http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/common/src/main/proto/hdds.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto index 0b650b4..6ea5727 100644 --- a/hadoop-hdds/common/src/main/proto/hdds.proto +++ b/hadoop-hdds/common/src/main/proto/hdds.proto @@ -50,7 +50,6 @@ message PipelineChannel { // A pipeline is composed of PipelineChannel (Ratis/StandAlone) that back a // container. message Pipeline { - required string containerName = 1; required PipelineChannel pipelineChannel = 2; } @@ -135,8 +134,7 @@ enum LifeCycleEvent { } message SCMContainerInfo { - // TODO : Remove the container name from pipeline. - required string containerName = 1; + required int64 containerID = 1; required LifeCycleState state = 2; required Pipeline pipeline = 3; // This is not total size of container, but space allocated by SCM for @@ -146,7 +144,6 @@ message SCMContainerInfo { required uint64 numberOfKeys = 6; optional int64 stateEnterTime = 7; required string owner = 8; - required int64 containerID = 9; } message GetScmInfoRequestProto { @@ -168,3 +165,11 @@ enum ReplicationFactor { ONE = 1; THREE = 3; } + +/** + * Block ID that uniquely identify a block by SCM. + */ +message BlockID { + required int64 containerID = 1; + required int64 localID = 2; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java index 68bf442..8c5609d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java @@ -21,7 +21,6 @@ import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import org.apache.commons.codec.binary.Hex; import org.apache.commons.codec.digest.DigestUtils; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.container.common.helpers .StorageContainerException; import org.apache.hadoop.hdds.protocol.proto.ContainerProtos; @@ -105,18 +104,17 @@ public final class ChunkUtils { * Validates chunk data and returns a file object to Chunk File that we are * expected to write data to. * - * @param pipeline - pipeline. * @param data - container data. * @param info - chunk info. * @return File * @throws StorageContainerException */ - public static File validateChunk(Pipeline pipeline, ContainerData data, + public static File validateChunk(ContainerData data, ChunkInfo info) throws StorageContainerException { Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class); - File chunkFile = getChunkFile(pipeline, data, info); + File chunkFile = getChunkFile(data, info); if (ChunkUtils.isOverWriteRequested(chunkFile, info)) { if (!ChunkUtils.isOverWritePermitted(info)) { log.error("Rejecting write chunk request. Chunk overwrite " + @@ -132,21 +130,21 @@ public final class ChunkUtils { /** * Validates that Path to chunk file exists. * - * @param pipeline - Container Info. * @param data - Container Data * @param info - Chunk info * @return - File. * @throws StorageContainerException */ - public static File getChunkFile(Pipeline pipeline, ContainerData data, + public static File getChunkFile(ContainerData data, ChunkInfo info) throws StorageContainerException { + Preconditions.checkNotNull(data, "Container data can't be null"); Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class); - if (data == null) { - log.error("Invalid container Name: {}", pipeline.getContainerName()); - throw new StorageContainerException("Unable to find the container Name:" + + if (data.getContainerID() < 0) { + log.error("Invalid container id: {}", data.getContainerID()); + throw new StorageContainerException("Unable to find the container id:" + " " + - pipeline.getContainerName(), CONTAINER_NOT_FOUND); + data.getContainerID(), CONTAINER_NOT_FOUND); } File dataDir = ContainerUtils.getDataDirectory(data).toFile(); @@ -335,7 +333,7 @@ public final class ChunkUtils { ContainerProtos.ReadChunkResponseProto.newBuilder(); response.setChunkData(info.getProtoBufMessage()); response.setData(ByteString.copyFrom(data)); - response.setPipeline(msg.getReadChunk().getPipeline()); + response.setBlockID(msg.getReadChunk().getBlockID()); ContainerProtos.ContainerCommandResponseProto.Builder builder = ContainerUtils.getContainerResponse(msg, ContainerProtos.Result http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java index c29374c..c20282a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java @@ -40,7 +40,6 @@ import java.util.concurrent.atomic.AtomicLong; */ public class ContainerData { - private final String containerName; private final Map<String, String> metadata; private String dbPath; // Path to Level DB Store. // Path to Physical file system where container and checksum are stored. @@ -48,18 +47,18 @@ public class ContainerData { private String hash; private AtomicLong bytesUsed; private long maxSize; - private Long containerID; + private long containerID; private HddsProtos.LifeCycleState state; /** * Constructs a ContainerData Object. * - * @param containerName - Name + * @param containerID - ID + * @param conf - Configuration */ - public ContainerData(String containerName, Long containerID, + public ContainerData(long containerID, Configuration conf) { this.metadata = new TreeMap<>(); - this.containerName = containerName; this.maxSize = conf.getLong(ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT) * OzoneConsts.GB; this.bytesUsed = new AtomicLong(0L); @@ -76,7 +75,7 @@ public class ContainerData { public static ContainerData getFromProtBuf( ContainerProtos.ContainerData protoData, Configuration conf) throws IOException { - ContainerData data = new ContainerData(protoData.getName(), + ContainerData data = new ContainerData( protoData.getContainerID(), conf); for (int x = 0; x < protoData.getMetadataCount(); x++) { data.addMetadata(protoData.getMetadata(x).getKey(), @@ -117,7 +116,6 @@ public class ContainerData { public ContainerProtos.ContainerData getProtoBufMessage() { ContainerProtos.ContainerData.Builder builder = ContainerProtos .ContainerData.newBuilder(); - builder.setName(this.getContainerName()); builder.setContainerID(this.getContainerID()); if (this.getDBPath() != null) { @@ -157,15 +155,6 @@ public class ContainerData { } /** - * Returns the name of the container. - * - * @return - name - */ - public String getContainerName() { - return containerName; - } - - /** * Adds metadata. */ public void addMetadata(String key, String value) throws IOException { @@ -231,9 +220,11 @@ public class ContainerData { * * @return String Name. */ - public String getName() { - return getContainerName(); - } + // TODO: check the ContainerCache class to see if we are using the ContainerID instead. + /* + public String getName() { + return getContainerID(); + }*/ /** * Get container file path. @@ -255,7 +246,7 @@ public class ContainerData { * Get container ID. * @return - container ID. */ - public synchronized Long getContainerID() { + public synchronized long getContainerID() { return containerID; } @@ -284,7 +275,7 @@ public class ContainerData { // Some thing brain dead for now. name + Time stamp of when we get the close // container message. - setHash(DigestUtils.sha256Hex(this.getContainerName() + + setHash(DigestUtils.sha256Hex(this.getContainerID() + Long.toString(Time.monotonicNow()))); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java index 50d2da3..19634f4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java @@ -26,7 +26,6 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro */ public class ContainerReport { private static final int UNKNOWN = -1; - private final String containerName; private final String finalhash; private long size; private long keyCount; @@ -51,11 +50,11 @@ public class ContainerReport { /** * Constructs the ContainerReport. * - * @param containerName - Container Name. + * @param containerID - Container ID. * @param finalhash - Final Hash. */ - public ContainerReport(String containerName, String finalhash) { - this.containerName = containerName; + public ContainerReport(long containerID, String finalhash) { + this.containerID = containerID; this.finalhash = finalhash; this.size = UNKNOWN; this.keyCount = UNKNOWN; @@ -74,7 +73,7 @@ public class ContainerReport { */ public static ContainerReport getFromProtoBuf(ContainerInfo info) { Preconditions.checkNotNull(info); - ContainerReport report = new ContainerReport(info.getContainerName(), + ContainerReport report = new ContainerReport(info.getContainerID(), info.getFinalhash()); if (info.hasSize()) { report.setSize(info.getSize()); @@ -103,15 +102,6 @@ public class ContainerReport { } /** - * Gets the container name. - * - * @return - Name - */ - public String getContainerName() { - return containerName; - } - - /** * Returns the final signature for this container. * * @return - hash @@ -203,7 +193,6 @@ public class ContainerReport { */ public ContainerInfo getProtoBufMessage() { return ContainerInfo.newBuilder() - .setContainerName(this.getContainerName()) .setKeyCount(this.getKeyCount()) .setSize(this.getSize()) .setUsed(this.getBytesUsed()) http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java index 1818188..e244354 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java @@ -184,6 +184,12 @@ public final class ContainerUtils { removeExtension(containerFile.getName())).toString(); } + public static long getContainerIDFromFile(File containerFile) { + Preconditions.checkNotNull(containerFile); + String containerID = getContainerNameFromFile(containerFile); + return Long.parseLong(containerID); + } + /** * Verifies that this in indeed a new container. * @@ -289,8 +295,8 @@ public final class ContainerUtils { */ public static File getMetadataFile(ContainerData containerData, Path location) { - return location.resolve(containerData - .getContainerName().concat(CONTAINER_META)) + return location.resolve(Long.toString(containerData + .getContainerID()).concat(CONTAINER_META)) .toFile(); } @@ -303,8 +309,8 @@ public final class ContainerUtils { */ public static File getContainerFile(ContainerData containerData, Path location) { - return location.resolve(containerData - .getContainerName().concat(CONTAINER_EXTENSION)) + return location.resolve(Long.toString(containerData + .getContainerID()).concat(CONTAINER_EXTENSION)) .toFile(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java index ade162a..9d0ec95 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java @@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.common.helpers; import com.google.common.collect.Maps; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.util.StringUtils; import java.util.List; import java.util.Map; @@ -37,7 +38,7 @@ public final class DeletedContainerBlocksSummary { // value : the number of blocks need to be deleted in this container // if the message contains multiple entries for same block, // blocks will be merged - private final Map<String, Integer> blockSummary; + private final Map<Long, Integer> blockSummary; // total number of blocks in this message private int numOfBlocks; @@ -47,14 +48,14 @@ public final class DeletedContainerBlocksSummary { blockSummary = Maps.newHashMap(); blocks.forEach(entry -> { txSummary.put(entry.getTxID(), entry.getCount()); - if (blockSummary.containsKey(entry.getContainerName())) { - blockSummary.put(entry.getContainerName(), - blockSummary.get(entry.getContainerName()) - + entry.getBlockIDCount()); + if (blockSummary.containsKey(entry.getContainerID())) { + blockSummary.put(entry.getContainerID(), + blockSummary.get(entry.getContainerID()) + + entry.getLocalIDCount()); } else { - blockSummary.put(entry.getContainerName(), entry.getBlockIDCount()); + blockSummary.put(entry.getContainerID(), entry.getLocalIDCount()); } - numOfBlocks += entry.getBlockIDCount(); + numOfBlocks += entry.getLocalIDCount(); }); } @@ -93,9 +94,9 @@ public final class DeletedContainerBlocksSummary { .append("TimesProceed=") .append(blks.getCount()) .append(", ") - .append(blks.getContainerName()) + .append(blks.getContainerID()) .append(" : [") - .append(String.join(",", blks.getBlockIDList())).append("]") + .append(StringUtils.join(',', blks.getLocalIDList())).append("]") .append("\n"); } return sb.toString(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FileUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FileUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FileUtils.java index 566db02..ec27452 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FileUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FileUtils.java @@ -65,7 +65,8 @@ public final class FileUtils { ContainerProtos.ReadChunkResponseProto.newBuilder(); readChunkresponse.setChunkData(info.getProtoBufMessage()); readChunkresponse.setData(ByteString.copyFrom(data)); - readChunkresponse.setPipeline(msg.getGetSmallFile().getKey().getPipeline()); + readChunkresponse.setBlockID(msg.getGetSmallFile().getKey(). + getKeyData().getBlockID()); ContainerProtos.GetSmallFileResponseProto.Builder getSmallFile = ContainerProtos.GetSmallFileResponseProto.newBuilder(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java index 33eb911..dbd5772 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java @@ -63,11 +63,11 @@ public final class KeyUtils { ContainerCache cache = ContainerCache.getInstance(conf); Preconditions.checkNotNull(cache); try { - return cache.getDB(container.getContainerName(), container.getDBPath()); + return cache.getDB(container.getContainerID(), container.getDBPath()); } catch (IOException ex) { String message = String.format("Unable to open DB. DB Name: %s, Path: %s. ex: %s", - container.getContainerName(), container.getDBPath(), ex.getMessage()); + container.getContainerID(), container.getDBPath(), ex.getMessage()); throw new StorageContainerException(message, UNABLE_TO_READ_METADATA_DB); } } @@ -83,7 +83,7 @@ public final class KeyUtils { Preconditions.checkNotNull(container); ContainerCache cache = ContainerCache.getInstance(conf); Preconditions.checkNotNull(cache); - cache.removeDB(container.getContainerName()); + cache.removeDB(container.getContainerID()); } /** * Shutdown all DB Handles. http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java index 457c417..3505196 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java @@ -19,11 +19,11 @@ package org.apache.hadoop.ozone.container.common.impl; import com.google.common.base.Preconditions; import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.container.common.helpers .StorageContainerException; import org.apache.hadoop.hdds.protocol.proto.ContainerProtos; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ChunkUtils; import org.apache.hadoop.ozone.container.common.helpers.ContainerData; @@ -66,13 +66,12 @@ public class ChunkManagerImpl implements ChunkManager { /** * writes a given chunk. * - * @param pipeline - Name and the set of machines that make this container. - * @param keyName - Name of the Key. + * @param blockID - ID of the block. * @param info - ChunkInfo. * @throws StorageContainerException */ @Override - public void writeChunk(Pipeline pipeline, String keyName, ChunkInfo info, + public void writeChunk(BlockID blockID, ChunkInfo info, byte[] data, ContainerProtos.Stage stage) throws StorageContainerException { // we don't want container manager to go away while we are writing chunks. @@ -80,13 +79,13 @@ public class ChunkManagerImpl implements ChunkManager { // TODO : Take keyManager Write lock here. try { - Preconditions.checkNotNull(pipeline, "Pipeline cannot be null"); - String containerName = pipeline.getContainerName(); - Preconditions.checkNotNull(containerName, - "Container name cannot be null"); + Preconditions.checkNotNull(blockID, "Block ID cannot be null."); + long containerID = blockID.getContainerID(); + Preconditions.checkState(containerID >= 0, + "Container ID cannot be negative"); ContainerData container = - containerManager.readContainer(containerName); - File chunkFile = ChunkUtils.validateChunk(pipeline, container, info); + containerManager.readContainer(containerID); + File chunkFile = ChunkUtils.validateChunk(container, info); File tmpChunkFile = getTmpChunkFile(chunkFile, info); LOG.debug("writing chunk:{} chunk stage:{} chunk file:{} tmp chunk file", @@ -96,16 +95,16 @@ public class ChunkManagerImpl implements ChunkManager { ChunkUtils.writeData(tmpChunkFile, info, data); break; case COMMIT_DATA: - commitChunk(tmpChunkFile, chunkFile, containerName, info.getLen()); + commitChunk(tmpChunkFile, chunkFile, containerID, info.getLen()); break; case COMBINED: // directly write to the chunk file long oldSize = chunkFile.length(); ChunkUtils.writeData(chunkFile, info, data); long newSize = chunkFile.length(); - containerManager.incrBytesUsed(containerName, newSize - oldSize); - containerManager.incrWriteCount(containerName); - containerManager.incrWriteBytes(containerName, info.getLen()); + containerManager.incrBytesUsed(containerID, newSize - oldSize); + containerManager.incrWriteCount(containerID); + containerManager.incrWriteBytes(containerID, info.getLen()); break; default: throw new IOException("Can not identify write operation."); @@ -136,22 +135,21 @@ public class ChunkManagerImpl implements ChunkManager { // Commit the chunk by renaming the temporary chunk file to chunk file private void commitChunk(File tmpChunkFile, File chunkFile, - String containerName, long chunkLen) throws IOException { + long containerID, long chunkLen) throws IOException { long sizeDiff = tmpChunkFile.length() - chunkFile.length(); // It is safe to replace here as the earlier chunk if existing should be // caught as part of validateChunk Files.move(tmpChunkFile.toPath(), chunkFile.toPath(), StandardCopyOption.REPLACE_EXISTING); - containerManager.incrBytesUsed(containerName, sizeDiff); - containerManager.incrWriteCount(containerName); - containerManager.incrWriteBytes(containerName, chunkLen); + containerManager.incrBytesUsed(containerID, sizeDiff); + containerManager.incrWriteCount(containerID); + containerManager.incrWriteBytes(containerID, chunkLen); } /** * reads the data defined by a chunk. * - * @param pipeline - container pipeline. - * @param keyName - Name of the Key + * @param blockID - ID of the block. * @param info - ChunkInfo. * @return byte array * @throws StorageContainerException @@ -159,20 +157,20 @@ public class ChunkManagerImpl implements ChunkManager { * TODO: Explore if we need to do that for ozone. */ @Override - public byte[] readChunk(Pipeline pipeline, String keyName, ChunkInfo info) + public byte[] readChunk(BlockID blockID, ChunkInfo info) throws StorageContainerException { containerManager.readLock(); try { - Preconditions.checkNotNull(pipeline, "Pipeline cannot be null"); - String containerName = pipeline.getContainerName(); - Preconditions.checkNotNull(containerName, - "Container name cannot be null"); + Preconditions.checkNotNull(blockID, "Block ID cannot be null."); + long containerID = blockID.getContainerID(); + Preconditions.checkState(containerID >= 0, + "Container ID cannot be negative"); ContainerData container = - containerManager.readContainer(containerName); - File chunkFile = ChunkUtils.getChunkFile(pipeline, container, info); + containerManager.readContainer(containerID); + File chunkFile = ChunkUtils.getChunkFile(container, info); ByteBuffer data = ChunkUtils.readData(chunkFile, info); - containerManager.incrReadCount(containerName); - containerManager.incrReadBytes(containerName, chunkFile.length()); + containerManager.incrReadCount(containerID); + containerManager.incrReadBytes(containerID, chunkFile.length()); return data.array(); } catch (ExecutionException | NoSuchAlgorithmException e) { LOG.error("read data failed. error: {}", e); @@ -191,25 +189,25 @@ public class ChunkManagerImpl implements ChunkManager { /** * Deletes a given chunk. * - * @param pipeline - Pipeline. - * @param keyName - Key Name + * @param blockID - ID of the block. * @param info - Chunk Info * @throws StorageContainerException */ @Override - public void deleteChunk(Pipeline pipeline, String keyName, ChunkInfo info) + public void deleteChunk(BlockID blockID, ChunkInfo info) throws StorageContainerException { containerManager.readLock(); try { - Preconditions.checkNotNull(pipeline, "Pipeline cannot be null"); - String containerName = pipeline.getContainerName(); - Preconditions.checkNotNull(containerName, - "Container name cannot be null"); - File chunkFile = ChunkUtils.getChunkFile(pipeline, containerManager - .readContainer(containerName), info); + Preconditions.checkNotNull(blockID, "Block ID cannot be null."); + long containerID = blockID.getContainerID(); + Preconditions.checkState(containerID >= 0, + "Container ID cannot be negative"); + + File chunkFile = ChunkUtils.getChunkFile(containerManager + .readContainer(containerID), info); if ((info.getOffset() == 0) && (info.getLen() == chunkFile.length())) { FileUtil.fullyDelete(chunkFile); - containerManager.decrBytesUsed(containerName, chunkFile.length()); + containerManager.decrBytesUsed(containerID, chunkFile.length()); } else { LOG.error("Not Supported Operation. Trying to delete a " + "chunk that is in shared file. chunk info : " + info.toString()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java index 5e7375c..1893b3b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java @@ -24,7 +24,6 @@ import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.container.common.helpers .StorageContainerException; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; @@ -113,7 +112,9 @@ public class ContainerManagerImpl implements ContainerManager { static final Logger LOG = LoggerFactory.getLogger(ContainerManagerImpl.class); - private final ConcurrentSkipListMap<String, ContainerStatus> + // TODO: consider primitive collection like eclipse-collections + // to avoid autoboxing overhead + private final ConcurrentSkipListMap<Long, ContainerStatus> containerMap = new ConcurrentSkipListMap<>(); // Use a non-fair RW lock for better throughput, we may revisit this decision @@ -229,6 +230,7 @@ public class ContainerManagerImpl implements ContainerManager { Preconditions.checkNotNull(keyName, "Container Name to container key mapping is null"); + long containerID = Long.parseLong(keyName); try { String containerFileName = containerName.concat(CONTAINER_EXTENSION); String metaFileName = containerName.concat(CONTAINER_META); @@ -249,7 +251,7 @@ public class ContainerManagerImpl implements ContainerManager { // when loading the info we get a null, this often means last time // SCM was ending up at some middle phase causing that the metadata // was not populated. Such containers are marked as inactive. - containerMap.put(keyName, new ContainerStatus(null)); + containerMap.put(containerID, new ContainerStatus(null)); return; } containerData = ContainerData.getFromProtBuf(containerDataProto, conf); @@ -263,7 +265,7 @@ public class ContainerManagerImpl implements ContainerManager { // Hopefully SCM will ask us to delete this container and rebuild it. LOG.error("Invalid SHA found for container data. Name :{}" + "cowardly refusing to read invalid data", containerName); - containerMap.put(keyName, new ContainerStatus(null)); + containerMap.put(containerID, new ContainerStatus(null)); return; } @@ -295,7 +297,7 @@ public class ContainerManagerImpl implements ContainerManager { }).sum(); containerStatus.setBytesUsed(bytesUsed); - containerMap.put(keyName, containerStatus); + containerMap.put(containerID, containerStatus); } catch (IOException | NoSuchAlgorithmException ex) { LOG.error("read failed for file: {} ex: {}", containerName, ex.getMessage()); @@ -303,7 +305,7 @@ public class ContainerManagerImpl implements ContainerManager { // TODO : Add this file to a recovery Queue. // Remember that this container is busted and we cannot use it. - containerMap.put(keyName, new ContainerStatus(null)); + containerMap.put(containerID, new ContainerStatus(null)); throw new StorageContainerException("Unable to read container info", UNABLE_TO_READ_METADATA_DB); } finally { @@ -316,18 +318,17 @@ public class ContainerManagerImpl implements ContainerManager { /** * Creates a container with the given name. * - * @param pipeline -- Nodes which make up this container. * @param containerData - Container Name and metadata. * @throws StorageContainerException - Exception */ @Override - public void createContainer(Pipeline pipeline, ContainerData containerData) + public void createContainer(ContainerData containerData) throws StorageContainerException { Preconditions.checkNotNull(containerData, "Container data cannot be null"); writeLock(); try { - if (containerMap.containsKey(containerData.getName())) { - LOG.debug("container already exists. {}", containerData.getName()); + if (containerMap.containsKey(containerData.getContainerID())) { + LOG.debug("container already exists. {}", containerData.getContainerID()); throw new StorageContainerException("container already exists.", CONTAINER_EXISTS); } @@ -399,7 +400,7 @@ public class ContainerManagerImpl implements ContainerManager { location); File metadataFile = ContainerUtils.getMetadataFile(containerData, location); - String containerName = containerData.getContainerName(); + String containerName = Long.toString(containerData.getContainerID()); if(!overwrite) { ContainerUtils.verifyIsNewContainer(containerFile, metadataFile); @@ -446,7 +447,7 @@ public class ContainerManagerImpl implements ContainerManager { LOG.error("Creation of container failed. Name: {}, we might need to " + "cleanup partially created artifacts. ", - containerData.getContainerName(), ex); + containerData.getContainerID(), ex); throw new StorageContainerException("Container creation failed. ", ex, CONTAINER_INTERNAL_ERROR); } finally { @@ -459,45 +460,45 @@ public class ContainerManagerImpl implements ContainerManager { /** * Deletes an existing container. * - * @param pipeline - nodes that make this container. - * @param containerName - name of the container. + * @param containerID - ID of the container. * @param forceDelete - whether this container should be deleted forcibly. * @throws StorageContainerException */ @Override - public void deleteContainer(Pipeline pipeline, String containerName, + public void deleteContainer(long containerID, boolean forceDelete) throws StorageContainerException { - Preconditions.checkNotNull(containerName, "Container name cannot be null"); - Preconditions.checkState(containerName.length() > 0, - "Container name length cannot be zero."); + Preconditions.checkState(containerID >= 0, + "Container ID cannot be negative."); writeLock(); try { - if (isOpen(pipeline.getContainerName())) { + if (isOpen(containerID)) { throw new StorageContainerException( "Deleting an open container is not allowed.", UNCLOSED_CONTAINER_IO); } - ContainerStatus status = containerMap.get(containerName); + ContainerStatus status = containerMap.get(containerID); if (status == null) { - LOG.debug("No such container. Name: {}", containerName); - throw new StorageContainerException("No such container. Name : " + - containerName, CONTAINER_NOT_FOUND); + LOG.debug("No such container. ID: {}", containerID); + throw new StorageContainerException("No such container. ID : " + + containerID, CONTAINER_NOT_FOUND); } if (status.getContainer() == null) { - LOG.debug("Invalid container data. Name: {}", containerName); + LOG.debug("Invalid container data. ID: {}", containerID); throw new StorageContainerException("Invalid container data. Name : " + - containerName, CONTAINER_NOT_FOUND); + containerID, CONTAINER_NOT_FOUND); } ContainerUtils.removeContainer(status.getContainer(), conf, forceDelete); - containerMap.remove(containerName); + containerMap.remove(containerID); } catch (StorageContainerException e) { throw e; } catch (IOException e) { // TODO : An I/O error during delete can leave partial artifacts on the // disk. We will need the cleaner thread to cleanup this information. - LOG.error("Failed to cleanup container. Name: {}", containerName, e); - throw new StorageContainerException(containerName, e, IO_EXCEPTION); + String errMsg = String.format("Failed to cleanup container. ID: %d", + containerID); + LOG.error(errMsg, e); + throw new StorageContainerException(errMsg, e, IO_EXCEPTION); } finally { writeUnlock(); } @@ -511,25 +512,29 @@ public class ContainerManagerImpl implements ContainerManager { * time. It is possible that using this iteration you can miss certain * container from the listing. * - * @param prefix - Return keys that match this prefix. + * @param startContainerID - Return containers with ID >= startContainerID. * @param count - how many to return - * @param prevKey - Previous Key Value or empty String. * @param data - Actual containerData * @throws StorageContainerException */ @Override - public void listContainer(String prefix, long count, String prevKey, + public void listContainer(long startContainerID, long count, List<ContainerData> data) throws StorageContainerException { - // TODO : Support list with Prefix and PrevKey Preconditions.checkNotNull(data, "Internal assertion: data cannot be null"); + Preconditions.checkState(startContainerID >= 0, + "Start container ID cannot be negative"); + Preconditions.checkState(count > 0, + "max number of containers returned " + + "must be positive"); + readLock(); try { - ConcurrentNavigableMap<String, ContainerStatus> map; - if (prevKey == null || prevKey.isEmpty()) { + ConcurrentNavigableMap<Long, ContainerStatus> map; + if (startContainerID == 0) { map = containerMap.tailMap(containerMap.firstKey(), true); } else { - map = containerMap.tailMap(prevKey, false); + map = containerMap.tailMap(startContainerID, false); } int currentCount = 0; @@ -549,24 +554,23 @@ public class ContainerManagerImpl implements ContainerManager { /** * Get metadata about a specific container. * - * @param containerName - Name of the container + * @param containerID - ID of the container * @return ContainerData - Container Data. * @throws StorageContainerException */ @Override - public ContainerData readContainer(String containerName) throws - StorageContainerException { - Preconditions.checkNotNull(containerName, "Container name cannot be null"); - Preconditions.checkState(containerName.length() > 0, - "Container name length cannot be zero."); - if (!containerMap.containsKey(containerName)) { - throw new StorageContainerException("Unable to find the container. Name: " - + containerName, CONTAINER_NOT_FOUND); + public ContainerData readContainer(long containerID) + throws StorageContainerException { + Preconditions.checkState(containerID >= 0, + "Container ID cannot be negative."); + if (!containerMap.containsKey(containerID)) { + throw new StorageContainerException("Unable to find the container. ID: " + + containerID, CONTAINER_NOT_FOUND); } - ContainerData cData = containerMap.get(containerName).getContainer(); + ContainerData cData = containerMap.get(containerID).getContainer(); if (cData == null) { - throw new StorageContainerException("Invalid container data. Name: " - + containerName, CONTAINER_INTERNAL_ERROR); + throw new StorageContainerException("Invalid container data. ID: " + + containerID, CONTAINER_INTERNAL_ERROR); } return cData; } @@ -575,13 +579,13 @@ public class ContainerManagerImpl implements ContainerManager { * Closes a open container, if it is already closed or does not exist a * StorageContainerException is thrown. * - * @param containerName - Name of the container. + * @param containerID - ID of the container. * @throws StorageContainerException */ @Override - public void closeContainer(String containerName) + public void closeContainer(long containerID) throws StorageContainerException, NoSuchAlgorithmException { - ContainerData containerData = readContainer(containerName); + ContainerData containerData = readContainer(containerID); containerData.closeContainer(); writeContainerInfo(containerData, true); MetadataStore db = KeyUtils.getDB(containerData, conf); @@ -602,15 +606,13 @@ public class ContainerManagerImpl implements ContainerManager { // issues. ContainerStatus status = new ContainerStatus(containerData); - containerMap.put(containerName, status); + containerMap.put(containerID, status); } @Override - public void updateContainer(Pipeline pipeline, String containerName, - ContainerData data, boolean forceUpdate) - throws StorageContainerException { - Preconditions.checkNotNull(pipeline, "Pipeline cannot be null"); - Preconditions.checkNotNull(containerName, "Container name cannot be null"); + public void updateContainer(long containerID, ContainerData data, + boolean forceUpdate) throws StorageContainerException { + Preconditions.checkState(containerID >= 0, "Container ID cannot be negative."); Preconditions.checkNotNull(data, "Container data cannot be null"); FileOutputStream containerStream = null; DigestOutputStream dos = null; @@ -618,9 +620,9 @@ public class ContainerManagerImpl implements ContainerManager { File containerFileBK = null, containerFile = null; boolean deleted = false; - if(!containerMap.containsKey(containerName)) { + if(!containerMap.containsKey(containerID)) { throw new StorageContainerException("Container doesn't exist. Name :" - + containerName, CONTAINER_NOT_FOUND); + + containerID, CONTAINER_NOT_FOUND); } try { @@ -633,7 +635,7 @@ public class ContainerManagerImpl implements ContainerManager { try { Path location = locationManager.getContainerPath(); - ContainerData orgData = containerMap.get(containerName).getContainer(); + ContainerData orgData = containerMap.get(containerID).getContainer(); if (orgData == null) { // updating a invalid container throw new StorageContainerException("Update a container with invalid" + @@ -642,7 +644,7 @@ public class ContainerManagerImpl implements ContainerManager { if (!forceUpdate && !orgData.isOpen()) { throw new StorageContainerException( - "Update a closed container is not allowed. Name: " + containerName, + "Update a closed container is not allowed. ID: " + containerID, UNSUPPORTED_REQUEST); } @@ -652,7 +654,7 @@ public class ContainerManagerImpl implements ContainerManager { if (!forceUpdate) { if (!containerFile.exists() || !containerFile.canWrite()) { throw new StorageContainerException( - "Container file not exists or corrupted. Name: " + containerName, + "Container file not exists or corrupted. ID: " + containerID, CONTAINER_INTERNAL_ERROR); } @@ -672,7 +674,7 @@ public class ContainerManagerImpl implements ContainerManager { // Update the in-memory map ContainerStatus newStatus = new ContainerStatus(data); - containerMap.replace(containerName, newStatus); + containerMap.replace(containerID, newStatus); } catch (IOException e) { // Restore the container file from backup if(containerFileBK != null && containerFileBK.exists() && deleted) { @@ -683,8 +685,8 @@ public class ContainerManagerImpl implements ContainerManager { CONTAINER_INTERNAL_ERROR); } else { throw new StorageContainerException( - "Failed to restore container data from the backup. Name: " - + containerName, CONTAINER_INTERNAL_ERROR); + "Failed to restore container data from the backup. ID: " + + containerID, CONTAINER_INTERNAL_ERROR); } } else { throw new StorageContainerException( @@ -711,22 +713,22 @@ public class ContainerManagerImpl implements ContainerManager { /** * Checks if a container exists. * - * @param containerName - Name of the container. + * @param containerID - ID of the container. * @return true if the container is open false otherwise. * @throws StorageContainerException - Throws Exception if we are not able to * find the container. */ @Override - public boolean isOpen(String containerName) throws StorageContainerException { - final ContainerStatus status = containerMap.get(containerName); + public boolean isOpen(long containerID) throws StorageContainerException { + final ContainerStatus status = containerMap.get(containerID); if (status == null) { throw new StorageContainerException( - "Container status not found: " + containerName, CONTAINER_NOT_FOUND); + "Container status not found: " + containerID, CONTAINER_NOT_FOUND); } final ContainerData cData = status.getContainer(); if (cData == null) { throw new StorageContainerException( - "Container not found: " + containerName, CONTAINER_NOT_FOUND); + "Container not found: " + containerID, CONTAINER_NOT_FOUND); } return cData.isOpen(); } @@ -746,7 +748,7 @@ public class ContainerManagerImpl implements ContainerManager { @VisibleForTesting - public ConcurrentSkipListMap<String, ContainerStatus> getContainerMap() { + public ConcurrentSkipListMap<Long, ContainerStatus> getContainerMap() { return containerMap; } @@ -901,7 +903,7 @@ public class ContainerManagerImpl implements ContainerManager { for (ContainerStatus container: containers) { StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder = StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder(); - ciBuilder.setContainerName(container.getContainer().getContainerName()) + ciBuilder.setContainerID(container.getContainer().getContainerID()) .setSize(container.getContainer().getMaxSize()) .setUsed(container.getContainer().getBytesUsed()) .setKeyCount(container.getContainer().getKeyCount()) @@ -966,7 +968,7 @@ public class ContainerManagerImpl implements ContainerManager { } @Override - public void incrPendingDeletionBlocks(int numBlocks, String containerId) { + public void incrPendingDeletionBlocks(int numBlocks, long containerId) { writeLock(); try { ContainerStatus status = containerMap.get(containerId); @@ -977,7 +979,7 @@ public class ContainerManagerImpl implements ContainerManager { } @Override - public void decrPendingDeletionBlocks(int numBlocks, String containerId) { + public void decrPendingDeletionBlocks(int numBlocks, long containerId) { writeLock(); try { ContainerStatus status = containerMap.get(containerId); @@ -990,35 +992,35 @@ public class ContainerManagerImpl implements ContainerManager { /** * Increase the read count of the container. * - * @param containerName - Name of the container. + * @param containerId - ID of the container. */ @Override - public void incrReadCount(String containerName) { - ContainerStatus status = containerMap.get(containerName); + public void incrReadCount(long containerId) { + ContainerStatus status = containerMap.get(containerId); status.incrReadCount(); } - public long getReadCount(String containerName) { - ContainerStatus status = containerMap.get(containerName); + public long getReadCount(long containerId) { + ContainerStatus status = containerMap.get(containerId); return status.getReadCount(); } /** * Increse the read counter for bytes read from the container. * - * @param containerName - Name of the container. + * @param containerId - ID of the container. * @param readBytes - bytes read from the container. */ @Override - public void incrReadBytes(String containerName, long readBytes) { - ContainerStatus status = containerMap.get(containerName); + public void incrReadBytes(long containerId, long readBytes) { + ContainerStatus status = containerMap.get(containerId); status.incrReadBytes(readBytes); } - public long getReadBytes(String containerName) { + public long getReadBytes(long containerId) { readLock(); try { - ContainerStatus status = containerMap.get(containerName); + ContainerStatus status = containerMap.get(containerId); return status.getReadBytes(); } finally { readUnlock(); @@ -1028,76 +1030,76 @@ public class ContainerManagerImpl implements ContainerManager { /** * Increase the write count of the container. * - * @param containerName - Name of the container. + * @param containerId - Name of the container. */ @Override - public void incrWriteCount(String containerName) { - ContainerStatus status = containerMap.get(containerName); + public void incrWriteCount(long containerId) { + ContainerStatus status = containerMap.get(containerId); status.incrWriteCount(); } - public long getWriteCount(String containerName) { - ContainerStatus status = containerMap.get(containerName); + public long getWriteCount(long containerId) { + ContainerStatus status = containerMap.get(containerId); return status.getWriteCount(); } /** * Increse the write counter for bytes write into the container. * - * @param containerName - Name of the container. + * @param containerId - ID of the container. * @param writeBytes - bytes write into the container. */ @Override - public void incrWriteBytes(String containerName, long writeBytes) { - ContainerStatus status = containerMap.get(containerName); + public void incrWriteBytes(long containerId, long writeBytes) { + ContainerStatus status = containerMap.get(containerId); status.incrWriteBytes(writeBytes); } - public long getWriteBytes(String containerName) { - ContainerStatus status = containerMap.get(containerName); + public long getWriteBytes(long containerId) { + ContainerStatus status = containerMap.get(containerId); return status.getWriteBytes(); } /** * Increase the bytes used by the container. * - * @param containerName - Name of the container. + * @param containerId - ID of the container. * @param used - additional bytes used by the container. * @return the current bytes used. */ @Override - public long incrBytesUsed(String containerName, long used) { - ContainerStatus status = containerMap.get(containerName); + public long incrBytesUsed(long containerId, long used) { + ContainerStatus status = containerMap.get(containerId); return status.incrBytesUsed(used); } /** * Decrease the bytes used by the container. * - * @param containerName - Name of the container. + * @param containerId - ID of the container. * @param used - additional bytes reclaimed by the container. * @return the current bytes used. */ @Override - public long decrBytesUsed(String containerName, long used) { - ContainerStatus status = containerMap.get(containerName); + public long decrBytesUsed(long containerId, long used) { + ContainerStatus status = containerMap.get(containerId); return status.decrBytesUsed(used); } - public long getBytesUsed(String containerName) { - ContainerStatus status = containerMap.get(containerName); + public long getBytesUsed(long containerId) { + ContainerStatus status = containerMap.get(containerId); return status.getBytesUsed(); } /** * Get the number of keys in the container. * - * @param containerName - Name of the container. + * @param containerId - ID of the container. * @return the current key count. */ @Override - public long getNumKeys(String containerName) { - ContainerStatus status = containerMap.get(containerName); + public long getNumKeys(long containerId) { + ContainerStatus status = containerMap.get(containerId); return status.getNumKeys(); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java index d319565..46bd842 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.protocol.proto.ContainerProtos import org.apache.hadoop.hdds.protocol.proto.ContainerProtos .ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Type; +import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ChunkUtils; import org.apache.hadoop.ozone.container.common.helpers.ContainerData; @@ -186,7 +187,7 @@ public class Dispatcher implements ContainerDispatcher { } catch (IOException ex) { LOG.warn("Container operation failed. " + "Container: {} Operation: {} trace ID: {} Error: {}", - msg.getCreateContainer().getContainerData().getName(), + msg.getCreateContainer().getContainerData().getContainerID(), msg.getCmdType().name(), msg.getTraceID(), ex.toString(), ex); @@ -230,7 +231,7 @@ public class Dispatcher implements ContainerDispatcher { } catch (IOException ex) { LOG.warn("Container operation failed. " + "Container: {} Operation: {} trace ID: {} Error: {}", - msg.getCreateContainer().getContainerData().getName(), + msg.getCreateContainer().getContainerData().getContainerID(), msg.getCmdType().name(), msg.getTraceID(), ex.toString(), ex); @@ -273,7 +274,7 @@ public class Dispatcher implements ContainerDispatcher { } catch (IOException ex) { LOG.warn("Container operation failed. " + "Container: {} Operation: {} trace ID: {} Error: {}", - msg.getCreateContainer().getContainerData().getName(), + msg.getCreateContainer().getContainerData().getContainerID(), msg.getCmdType().name(), msg.getTraceID(), ex.toString(), ex); @@ -318,17 +319,14 @@ public class Dispatcher implements ContainerDispatcher { msg.getTraceID()); return ContainerUtils.malformedRequest(msg); } - - Pipeline pipeline = Pipeline.getFromProtoBuf( - msg.getUpdateContainer().getPipeline()); - String containerName = msg.getUpdateContainer() - .getContainerData().getName(); + long containerID = msg.getUpdateContainer() + .getContainerData().getContainerID(); ContainerData data = ContainerData.getFromProtBuf( msg.getUpdateContainer().getContainerData(), conf); boolean forceUpdate = msg.getUpdateContainer().getForceUpdate(); - this.containerManager.updateContainer( - pipeline, containerName, data, forceUpdate); + this.containerManager.updateContainer(containerID, + data, forceUpdate); return ContainerUtils.getContainerResponse(msg); } @@ -349,8 +347,9 @@ public class Dispatcher implements ContainerDispatcher { return ContainerUtils.malformedRequest(msg); } - String name = msg.getReadContainer().getName(); - ContainerData container = this.containerManager.readContainer(name); + long containerID = msg.getReadContainer().getContainerID(); + ContainerData container = this.containerManager. + readContainer(containerID); return ContainerUtils.getReadContainerResponse(msg, container); } @@ -370,12 +369,9 @@ public class Dispatcher implements ContainerDispatcher { return ContainerUtils.malformedRequest(msg); } - Pipeline pipeline = Pipeline.getFromProtoBuf( - msg.getDeleteContainer().getPipeline()); - Preconditions.checkNotNull(pipeline); - String name = msg.getDeleteContainer().getName(); + long containerID = msg.getDeleteContainer().getContainerID(); boolean forceDelete = msg.getDeleteContainer().getForceDelete(); - this.containerManager.deleteContainer(pipeline, name, forceDelete); + this.containerManager.deleteContainer(containerID, forceDelete); return ContainerUtils.getContainerResponse(msg); } @@ -401,7 +397,7 @@ public class Dispatcher implements ContainerDispatcher { msg.getCreateContainer().getPipeline()); Preconditions.checkNotNull(pipeline, "Pipeline cannot be null"); - this.containerManager.createContainer(pipeline, cData); + this.containerManager.createContainer(cData); return ContainerUtils.getContainerResponse(msg); } @@ -420,14 +416,12 @@ public class Dispatcher implements ContainerDispatcher { msg.getTraceID()); return ContainerUtils.malformedRequest(msg); } - Pipeline pipeline = Pipeline.getFromProtoBuf(msg.getCloseContainer() - .getPipeline()); - Preconditions.checkNotNull(pipeline, "Pipeline cannot be null"); - if (!this.containerManager.isOpen(pipeline.getContainerName())) { + long containerID = msg.getCloseContainer().getContainerID(); + if (!this.containerManager.isOpen(containerID)) { throw new StorageContainerException("Attempting to close a closed " + "container.", CLOSED_CONTAINER_IO); } - this.containerManager.closeContainer(pipeline.getContainerName()); + this.containerManager.closeContainer(containerID); return ContainerUtils.getContainerResponse(msg); } catch (NoSuchAlgorithmException e) { throw new StorageContainerException("No such Algorithm", e, @@ -449,11 +443,9 @@ public class Dispatcher implements ContainerDispatcher { msg.getTraceID()); return ContainerUtils.malformedRequest(msg); } - String keyName = msg.getWriteChunk().getKeyName(); - Pipeline pipeline = Pipeline.getFromProtoBuf( - msg.getWriteChunk().getPipeline()); - Preconditions.checkNotNull(pipeline); - if (!this.containerManager.isOpen(pipeline.getContainerName())) { + BlockID blockID = BlockID.getFromProtobuf( + msg.getWriteChunk().getBlockID()); + if (!this.containerManager.isOpen(blockID.getContainerID())) { throw new StorageContainerException("Write to closed container.", CLOSED_CONTAINER_IO); } @@ -469,7 +461,7 @@ public class Dispatcher implements ContainerDispatcher { } this.containerManager.getChunkManager() - .writeChunk(pipeline, keyName, chunkInfo, + .writeChunk(blockID, chunkInfo, data, msg.getWriteChunk().getStage()); return ChunkUtils.getChunkResponse(msg); @@ -489,17 +481,13 @@ public class Dispatcher implements ContainerDispatcher { msg.getTraceID()); return ContainerUtils.malformedRequest(msg); } - - String keyName = msg.getReadChunk().getKeyName(); - Pipeline pipeline = Pipeline.getFromProtoBuf( - msg.getReadChunk().getPipeline()); - Preconditions.checkNotNull(pipeline); - + BlockID blockID = BlockID.getFromProtobuf( + msg.getReadChunk().getBlockID()); ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getReadChunk() .getChunkData()); Preconditions.checkNotNull(chunkInfo); - byte[] data = this.containerManager.getChunkManager().readChunk(pipeline, - keyName, chunkInfo); + byte[] data = this.containerManager.getChunkManager(). + readChunk(blockID, chunkInfo); metrics.incContainerBytesStats(Type.ReadChunk, data.length); return ChunkUtils.getReadChunkResponse(msg, data, chunkInfo); } @@ -519,11 +507,10 @@ public class Dispatcher implements ContainerDispatcher { return ContainerUtils.malformedRequest(msg); } - String keyName = msg.getDeleteChunk().getKeyName(); - Pipeline pipeline = Pipeline.getFromProtoBuf( - msg.getDeleteChunk().getPipeline()); - Preconditions.checkNotNull(pipeline); - if (!this.containerManager.isOpen(pipeline.getContainerName())) { + BlockID blockID = BlockID.getFromProtobuf(msg.getDeleteChunk() + .getBlockID()); + long containerID = blockID.getContainerID(); + if (!this.containerManager.isOpen(containerID)) { throw new StorageContainerException("Write to closed container.", CLOSED_CONTAINER_IO); } @@ -531,7 +518,7 @@ public class Dispatcher implements ContainerDispatcher { .getChunkData()); Preconditions.checkNotNull(chunkInfo); - this.containerManager.getChunkManager().deleteChunk(pipeline, keyName, + this.containerManager.getChunkManager().deleteChunk(blockID, chunkInfo); return ChunkUtils.getChunkResponse(msg); } @@ -550,15 +537,16 @@ public class Dispatcher implements ContainerDispatcher { msg.getTraceID()); return ContainerUtils.malformedRequest(msg); } - Pipeline pipeline = Pipeline.getFromProtoBuf(msg.getPutKey().getPipeline()); - Preconditions.checkNotNull(pipeline); - if (!this.containerManager.isOpen(pipeline.getContainerName())) { + BlockID blockID = BlockID.getFromProtobuf( + msg.getPutKey().getKeyData().getBlockID()); + long containerID = blockID.getContainerID(); + if (!this.containerManager.isOpen(containerID)) { throw new StorageContainerException("Write to closed container.", CLOSED_CONTAINER_IO); } KeyData keyData = KeyData.getFromProtoBuf(msg.getPutKey().getKeyData()); Preconditions.checkNotNull(keyData); - this.containerManager.getKeyManager().putKey(pipeline, keyData); + this.containerManager.getKeyManager().putKey(keyData); long numBytes = keyData.getProtoBufMessage().toByteArray().length; metrics.incContainerBytesStats(Type.PutKey, numBytes); return KeyUtils.getKeyResponse(msg); @@ -601,17 +589,15 @@ public class Dispatcher implements ContainerDispatcher { msg.getTraceID()); return ContainerUtils.malformedRequest(msg); } - Pipeline pipeline = - Pipeline.getFromProtoBuf(msg.getDeleteKey().getPipeline()); - Preconditions.checkNotNull(pipeline); - if (!this.containerManager.isOpen(pipeline.getContainerName())) { + BlockID blockID = BlockID.getFromProtobuf(msg.getDeleteKey() + .getBlockID()); + Preconditions.checkNotNull(blockID); + long containerID = blockID.getContainerID(); + if (!this.containerManager.isOpen(containerID)) { throw new StorageContainerException("Write to closed container.", CLOSED_CONTAINER_IO); } - String keyName = msg.getDeleteKey().getName(); - Preconditions.checkNotNull(keyName); - Preconditions.checkState(!keyName.isEmpty()); - this.containerManager.getKeyManager().deleteKey(pipeline, keyName); + this.containerManager.getKeyManager().deleteKey(blockID); return KeyUtils.getKeyResponse(msg); } @@ -632,12 +618,11 @@ public class Dispatcher implements ContainerDispatcher { } try { - Pipeline pipeline = - Pipeline.getFromProtoBuf(msg.getPutSmallFile() - .getKey().getPipeline()); + BlockID blockID = BlockID.getFromProtobuf(msg. + getPutSmallFile().getKey().getKeyData().getBlockID()); + long containerID = blockID.getContainerID(); - Preconditions.checkNotNull(pipeline); - if (!this.containerManager.isOpen(pipeline.getContainerName())) { + if (!this.containerManager.isOpen(containerID)) { throw new StorageContainerException("Write to closed container.", CLOSED_CONTAINER_IO); } @@ -648,12 +633,12 @@ public class Dispatcher implements ContainerDispatcher { byte[] data = msg.getPutSmallFile().getData().toByteArray(); metrics.incContainerBytesStats(Type.PutSmallFile, data.length); - this.containerManager.getChunkManager().writeChunk(pipeline, keyData - .getKeyName(), chunkInfo, data, ContainerProtos.Stage.COMBINED); + this.containerManager.getChunkManager().writeChunk(blockID, + chunkInfo, data, ContainerProtos.Stage.COMBINED); List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>(); chunks.add(chunkInfo.getProtoBufMessage()); keyData.setChunks(chunks); - this.containerManager.getKeyManager().putKey(pipeline, keyData); + this.containerManager.getKeyManager().putKey(keyData); return FileUtils.getPutFileResponse(msg); } catch (StorageContainerException e) { return ContainerUtils.logAndReturnError(LOG, e, msg); @@ -680,12 +665,7 @@ public class Dispatcher implements ContainerDispatcher { return ContainerUtils.malformedRequest(msg); } try { - Pipeline pipeline = - Pipeline.getFromProtoBuf(msg.getGetSmallFile() - .getKey().getPipeline()); - long bytes = 0; - Preconditions.checkNotNull(pipeline); KeyData keyData = KeyData.getFromProtoBuf(msg.getGetSmallFile() .getKey().getKeyData()); KeyData data = this.containerManager.getKeyManager().getKey(keyData); @@ -694,9 +674,8 @@ public class Dispatcher implements ContainerDispatcher { bytes += chunk.getSerializedSize(); ByteString current = ByteString.copyFrom(this.containerManager.getChunkManager() - .readChunk( - pipeline, keyData.getKeyName(), ChunkInfo.getFromProtoBuf( - chunk))); + .readChunk(keyData.getBlockID(), + ChunkInfo.getFromProtoBuf(chunk))); dataBuf = dataBuf.concat(current); c = chunk; } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org