HDDS-694. Plugin new Pipeline management code in SCM. Contributed by Lokesh Jain.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/874e06e5 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/874e06e5 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/874e06e5 Branch: refs/heads/HDFS-13891 Commit: 874e06e50e1184aa7b71dde6ecd67dea86765133 Parents: 615f532 Author: Nanda kumar <na...@apache.org> Authored: Fri Oct 26 17:53:47 2018 +0530 Committer: Brahma Reddy Battula <bra...@apache.org> Committed: Tue Oct 30 11:31:17 2018 +0530 ---------------------------------------------------------------------- .../hadoop/hdds/scm/XceiverClientGrpc.java | 15 +- .../hadoop/hdds/scm/XceiverClientManager.java | 6 +- .../hadoop/hdds/scm/XceiverClientRatis.java | 31 +- .../scm/client/ContainerOperationClient.java | 40 +- .../hadoop/hdds/scm/XceiverClientSpi.java | 2 +- .../hadoop/hdds/scm/client/ScmClient.java | 2 +- .../hdds/scm/container/ContainerInfo.java | 2 +- .../common/helpers/AllocatedBlock.java | 1 + .../common/helpers/ContainerWithPipeline.java | 3 +- .../scm/container/common/helpers/Pipeline.java | 319 ------------ .../container/common/helpers/PipelineID.java | 97 ---- .../hadoop/hdds/scm/pipeline/Pipeline.java | 41 +- .../scm/pipeline/PipelineNotFoundException.java | 28 ++ .../StorageContainerLocationProtocol.java | 2 +- ...kLocationProtocolClientSideTranslatorPB.java | 4 +- ...rLocationProtocolClientSideTranslatorPB.java | 4 +- .../scm/storage/ContainerProtocolCalls.java | 22 +- .../main/java/org/apache/ratis/RatisHelper.java | 15 +- .../transport/server/XceiverServerGrpc.java | 2 +- .../server/ratis/XceiverServerRatis.java | 10 +- .../commands/CloseContainerCommand.java | 2 +- .../hadoop/hdds/scm/block/BlockManagerImpl.java | 2 +- .../block/DatanodeDeletedBlockTransactions.java | 4 +- .../hdds/scm/block/DeletedBlockLogImpl.java | 5 +- .../container/CloseContainerEventHandler.java | 4 +- .../hdds/scm/container/ContainerManager.java | 6 +- .../scm/container/ContainerStateManager.java | 25 +- .../hdds/scm/container/SCMContainerManager.java | 59 +-- .../hadoop/hdds/scm/events/SCMEvents.java | 9 - .../hadoop/hdds/scm/node/NodeManager.java | 4 +- .../hadoop/hdds/scm/node/NodeStateManager.java | 4 +- .../hadoop/hdds/scm/node/SCMNodeManager.java | 4 +- .../hadoop/hdds/scm/node/StaleNodeHandler.java | 30 +- .../hdds/scm/node/states/Node2PipelineMap.java | 8 +- .../hdds/scm/pipeline/PipelineFactory.java | 6 +- .../hdds/scm/pipeline/PipelineManager.java | 10 +- .../hdds/scm/pipeline/PipelineProvider.java | 2 +- .../scm/pipeline/PipelineReportHandler.java | 16 +- .../hdds/scm/pipeline/PipelineStateManager.java | 27 +- .../hdds/scm/pipeline/PipelineStateMap.java | 91 +++- .../scm/pipeline/RatisPipelineProvider.java | 15 +- .../hdds/scm/pipeline/SCMPipelineManager.java | 37 +- .../scm/pipeline/SimplePipelineProvider.java | 13 +- .../pipelines/PipelineActionEventHandler.java | 62 --- .../scm/pipelines/PipelineCloseHandler.java | 52 -- .../hdds/scm/pipelines/PipelineManager.java | 171 ------- .../scm/pipelines/PipelineReportHandler.java | 59 --- .../hdds/scm/pipelines/PipelineSelector.java | 481 ------------------- .../scm/pipelines/PipelineStateManager.java | 136 ------ .../hadoop/hdds/scm/pipelines/package-info.java | 38 -- .../scm/pipelines/ratis/RatisManagerImpl.java | 129 ----- .../hdds/scm/pipelines/ratis/package-info.java | 18 - .../standalone/StandaloneManagerImpl.java | 122 ----- .../scm/pipelines/standalone/package-info.java | 18 - .../scm/server/SCMClientProtocolServer.java | 2 +- .../scm/server/StorageContainerManager.java | 39 +- .../hadoop/hdds/scm/block/TestBlockManager.java | 9 +- .../hdds/scm/block/TestDeletedBlockLog.java | 32 +- .../hdds/scm/container/MockNodeManager.java | 4 +- .../TestCloseContainerEventHandler.java | 20 +- .../container/TestContainerReportHandler.java | 7 +- .../container/TestContainerStateManager.java | 29 +- .../scm/container/TestSCMContainerManager.java | 24 +- .../replication/TestReplicationManager.java | 29 +- .../hdds/scm/node/TestContainerPlacement.java | 8 +- .../hdds/scm/node/TestDeadNodeHandler.java | 7 +- .../hadoop/hdds/scm/node/TestNodeManager.java | 2 +- .../ozone/container/common/TestEndPoint.java | 2 +- .../testutils/ReplicationNodeManagerMock.java | 4 +- .../hdds/scm/cli/container/InfoSubcommand.java | 5 +- .../ozone/client/io/ChunkGroupInputStream.java | 7 +- .../hdds/scm/pipeline/TestNode2PipelineMap.java | 29 +- .../hdds/scm/pipeline/TestNodeFailure.java | 22 +- .../hdds/scm/pipeline/TestPipelineClose.java | 41 +- .../scm/pipeline/TestPipelineStateManager.java | 171 ++++--- .../scm/pipeline/TestRatisPipelineProvider.java | 13 +- .../scm/pipeline/TestSCMPipelineManager.java | 45 +- .../hdds/scm/pipeline/TestSCMRestart.java | 23 +- .../pipeline/TestSimplePipelineProvider.java | 13 +- .../apache/hadoop/ozone/RatisTestHelper.java | 2 +- .../TestContainerStateMachineIdempotency.java | 2 +- .../hadoop/ozone/TestMiniOzoneCluster.java | 23 +- .../ozone/TestStorageContainerManager.java | 4 +- .../TestStorageContainerManagerHelper.java | 5 +- .../ozone/client/rest/TestOzoneRestClient.java | 4 +- .../rpc/TestCloseContainerHandlingByClient.java | 10 +- .../ozone/client/rpc/TestOzoneRpcClient.java | 6 +- .../ozone/container/ContainerTestHelper.java | 70 +-- .../container/TestContainerReplication.java | 6 +- .../common/impl/TestCloseContainerHandler.java | 10 +- .../TestCloseContainerByPipeline.java | 8 +- .../TestCloseContainerHandler.java | 2 +- .../transport/server/ratis/TestCSMMetrics.java | 4 +- .../container/metrics/TestContainerMetrics.java | 4 +- .../container/ozoneimpl/TestOzoneContainer.java | 9 +- .../ozoneimpl/TestOzoneContainerRatis.java | 6 +- .../container/server/TestContainerServer.java | 21 +- .../hadoop/ozone/scm/TestAllocateContainer.java | 2 +- .../TestGetCommittedBlockLengthAndPutKey.java | 2 +- .../ozone/om/ScmBlockLocationTestIngClient.java | 19 +- .../genesis/BenchMarkContainerStateMap.java | 27 +- .../hadoop/ozone/scm/TestContainerSQLCli.java | 6 +- 102 files changed, 791 insertions(+), 2363 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index 9526be3..cc34e27 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -29,7 +29,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServi import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceStub; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.util.Time; import org.apache.ratis.thirdparty.io.grpc.ManagedChannel; @@ -39,6 +39,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.UUID; @@ -84,9 +85,9 @@ public class XceiverClientGrpc extends XceiverClientSpi { public void connect() throws Exception { // leader by default is the 1st datanode in the datanode list of pipleline - DatanodeDetails leader = this.pipeline.getLeader(); + DatanodeDetails dn = this.pipeline.getFirstNode(); // just make a connection to the 1st datanode at the beginning - connectToDatanode(leader); + connectToDatanode(dn); } private void connectToDatanode(DatanodeDetails dn) { @@ -148,18 +149,16 @@ public class XceiverClientGrpc extends XceiverClientSpi { public ContainerCommandResponseProto sendCommandWithRetry( ContainerCommandRequestProto request) throws IOException { - int size = pipeline.getMachines().size(); ContainerCommandResponseProto responseProto = null; - DatanodeDetails dn = null; // In case of an exception or an error, we will try to read from the // datanodes in the pipeline in a round robin fashion. // TODO: cache the correct leader info in here, so that any subsequent calls // should first go to leader - for (int dnIndex = 0; dnIndex < size; dnIndex++) { + List<DatanodeDetails> dns = pipeline.getNodes(); + for (DatanodeDetails dn : dns) { try { - dn = pipeline.getMachines().get(dnIndex); LOG.debug("Executing command " + request + " on datanode " + dn); // In case the command gets retried on a 2nd datanode, // sendCommandAsyncCall will create a new channel and async stub @@ -201,7 +200,7 @@ public class XceiverClientGrpc extends XceiverClientSpi { public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync( ContainerCommandRequestProto request) throws IOException, ExecutionException, InterruptedException { - return sendCommandAsync(request, pipeline.getLeader()); + return sendCommandAsync(request, pipeline.getFirstNode()); } private CompletableFuture<ContainerCommandResponseProto> sendCommandAsync( http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java index 83b5a4c..1973c1d 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java @@ -25,7 +25,7 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import java.io.Closeable; @@ -115,8 +115,8 @@ public class XceiverClientManager implements Closeable { public XceiverClientSpi acquireClient(Pipeline pipeline) throws IOException { Preconditions.checkNotNull(pipeline); - Preconditions.checkArgument(pipeline.getMachines() != null); - Preconditions.checkArgument(!pipeline.getMachines().isEmpty()); + Preconditions.checkArgument(pipeline.getNodes() != null); + Preconditions.checkArgument(!pipeline.getNodes().isEmpty()); synchronized (clientCache) { XceiverClientSpi info = getClient(pipeline); http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index d2eb68b..f38fd3b 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hdds.scm; import org.apache.hadoop.hdds.HddsUtils; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.io.MultipleIOException; import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.retry.RetryPolicy; @@ -27,7 +26,7 @@ import org.apache.ratis.thirdparty.com.google.protobuf .InvalidProtocolBufferException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandRequestProto; @@ -64,19 +63,6 @@ public final class XceiverClientRatis extends XceiverClientSpi { static final Logger LOG = LoggerFactory.getLogger(XceiverClientRatis.class); public static XceiverClientRatis newXceiverClientRatis( - Pipeline pipeline, Configuration ozoneConf) { - final String rpcType = ozoneConf.get( - ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, - ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); - final int maxOutstandingRequests = - HddsClientUtils.getMaxOutstandingRequests(ozoneConf); - final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf); - return new XceiverClientRatis(pipeline, - SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests, - retryPolicy); - } - - public static XceiverClientRatis newXceiverClientRatis( org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline, Configuration ozoneConf) { final String rpcType = ozoneConf @@ -85,11 +71,7 @@ public final class XceiverClientRatis extends XceiverClientSpi { final int maxOutstandingRequests = HddsClientUtils.getMaxOutstandingRequests(ozoneConf); final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf); - Pipeline pipeline1 = - new Pipeline(pipeline.getNodes().get(0).getUuidString(), - HddsProtos.LifeCycleState.OPEN, pipeline.getType(), - pipeline.getFactor(), PipelineID.valueOf(pipeline.getID().getId())); - return new XceiverClientRatis(pipeline1, + return new XceiverClientRatis(pipeline, SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests, retryPolicy); } @@ -118,7 +100,7 @@ public final class XceiverClientRatis extends XceiverClientSpi { public void createPipeline() throws IOException { final RaftGroup group = RatisHelper.newRaftGroup(pipeline); LOG.debug("creating pipeline:{} with {}", pipeline.getId(), group); - callRatisRpc(pipeline.getMachines(), + callRatisRpc(pipeline.getNodes(), (raftClient, peer) -> raftClient.groupAdd(group, peer.getId())); } @@ -128,7 +110,7 @@ public final class XceiverClientRatis extends XceiverClientSpi { public void destroyPipeline() throws IOException { final RaftGroup group = RatisHelper.newRaftGroup(pipeline); LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group); - callRatisRpc(pipeline.getMachines(), (raftClient, peer) -> raftClient + callRatisRpc(pipeline.getNodes(), (raftClient, peer) -> raftClient .groupRemove(group.getGroupId(), true, peer.getId())); } @@ -174,9 +156,8 @@ public final class XceiverClientRatis extends XceiverClientSpi { @Override public void connect() throws Exception { - LOG.debug("Connecting to pipeline:{} leader:{}", - getPipeline().getId(), - RatisHelper.toRaftPeerId(pipeline.getLeader())); + LOG.debug("Connecting to pipeline:{} datanode:{}", getPipeline().getId(), + RatisHelper.toRaftPeerId(pipeline.getFirstNode())); // TODO : XceiverClient ratis should pass the config value of // maxOutstandingRequests so as to set the upper bound on max no of async // requests to be handled by raft client http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java index c635df4..25a71df 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java @@ -22,7 +22,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.ContainerInfo; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.protocolPB .StorageContainerLocationProtocolClientSideTranslatorPB; import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; @@ -40,11 +40,6 @@ import java.io.IOException; import java.util.List; import java.util.UUID; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState - .ALLOCATED; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState - .OPEN; - /** * This class provides the client-facing APIs of container operations. */ @@ -98,14 +93,10 @@ public class ContainerOperationClient implements ScmClient { Pipeline pipeline = containerWithPipeline.getPipeline(); client = xceiverClientManager.acquireClient(pipeline); - // Allocated State means that SCM has allocated this pipeline in its - // namespace. The client needs to create the pipeline on the machines - // which was choosen by the SCM. - Preconditions.checkState(pipeline.getLifeCycleState() == ALLOCATED || - pipeline.getLifeCycleState() == OPEN, "Unexpected pipeline state"); - if (pipeline.getLifeCycleState() == ALLOCATED) { - createPipeline(client, pipeline); - } + Preconditions.checkState(pipeline.isOpen(), String + .format("Unexpected state=%s for pipeline=%s, expected state=%s", + pipeline.getPipelineState(), pipeline.getId(), + Pipeline.PipelineState.OPEN)); createContainer(client, containerWithPipeline.getContainerInfo().getContainerID()); return containerWithPipeline; @@ -142,8 +133,7 @@ public class ContainerOperationClient implements ScmClient { // creation state. if (LOG.isDebugEnabled()) { LOG.debug("Created container " + containerId - + " leader:" + client.getPipeline().getLeader() - + " machines:" + client.getPipeline().getMachines()); + + " machines:" + client.getPipeline().getNodes()); } } @@ -208,12 +198,6 @@ public class ContainerOperationClient implements ScmClient { Pipeline pipeline = containerWithPipeline.getPipeline(); client = xceiverClientManager.acquireClient(pipeline); - // Allocated State means that SCM has allocated this pipeline in its - // namespace. The client needs to create the pipeline on the machines - // which was choosen by the SCM. - if (pipeline.getLifeCycleState() == ALLOCATED) { - createPipeline(client, pipeline); - } // connect to pipeline leader and allocate container on leader datanode. client = xceiverClientManager.acquireClient(pipeline); createContainer(client, @@ -283,10 +267,8 @@ public class ContainerOperationClient implements ScmClient { storageContainerLocationClient .deleteContainer(containerId); if (LOG.isDebugEnabled()) { - LOG.debug("Deleted container {}, leader: {}, machines: {} ", - containerId, - pipeline.getLeader(), - pipeline.getMachines()); + LOG.debug("Deleted container {}, machines: {} ", containerId, + pipeline.getNodes()); } } finally { if (client != null) { @@ -336,10 +318,8 @@ public class ContainerOperationClient implements ScmClient { ReadContainerResponseProto response = ContainerProtocolCalls.readContainer(client, containerID, traceID); if (LOG.isDebugEnabled()) { - LOG.debug("Read container {}, leader: {}, machines: {} ", - containerID, - pipeline.getLeader(), - pipeline.getMachines()); + LOG.debug("Read container {}, machines: {} ", containerID, + pipeline.getNodes()); } return response.getContainerData(); } finally { http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java index 571d148..b36315e 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java @@ -19,7 +19,7 @@ package org.apache.hadoop.hdds.scm; import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java index c37f42c..3d5d56c 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java @@ -20,7 +20,7 @@ package org.apache.hadoop.hdds.scm.client; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.ContainerInfo; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerData; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java index a5ea3e3..64407a7 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java @@ -36,7 +36,7 @@ import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.util.Time; /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/AllocatedBlock.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/AllocatedBlock.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/AllocatedBlock.java index 63781a8..f657b74 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/AllocatedBlock.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/AllocatedBlock.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.container.common.helpers; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.client.BlockID; /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java index af74a7d..8f49255 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java @@ -23,6 +23,7 @@ import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; /** * Class wraps ozone container info. @@ -50,7 +51,7 @@ public class ContainerWithPipeline implements Comparator<ContainerWithPipeline>, HddsProtos.ContainerWithPipeline allocatedContainer) { return new ContainerWithPipeline( ContainerInfo.fromProtobuf(allocatedContainer.getContainerInfo()), - Pipeline.getFromProtoBuf(allocatedContainer.getPipeline())); + Pipeline.getFromProtobuf(allocatedContainer.getPipeline())); } public HddsProtos.ContainerWithPipeline getProtobuf() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java deleted file mode 100644 index b0817f7..0000000 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java +++ /dev/null @@ -1,319 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.scm.container.common.helpers; - -import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.JsonFilter; -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.PropertyAccessor; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectWriter; -import com.fasterxml.jackson.databind.ser.FilterProvider; -import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter; -import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider; -import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.List; - -/** - * A pipeline represents the group of machines over which a container lives. - */ -public class Pipeline { - static final String PIPELINE_INFO = "PIPELINE_INFO_FILTER"; - private static final ObjectWriter WRITER; - - static { - ObjectMapper mapper = new ObjectMapper(); - String[] ignorableFieldNames = {"leaderID", "datanodes"}; - FilterProvider filters = new SimpleFilterProvider() - .addFilter(PIPELINE_INFO, SimpleBeanPropertyFilter - .serializeAllExcept(ignorableFieldNames)); - mapper.setVisibility(PropertyAccessor.FIELD, - JsonAutoDetect.Visibility.ANY); - mapper.addMixIn(Object.class, MixIn.class); - - WRITER = mapper.writer(filters); - } - - @JsonIgnore - private String leaderID; - @JsonIgnore - private Map<String, DatanodeDetails> datanodes; - private HddsProtos.LifeCycleState lifeCycleState; - private HddsProtos.ReplicationType type; - private HddsProtos.ReplicationFactor factor; - private PipelineID id; - - /** - * Constructs a new pipeline data structure. - * - * @param leaderID - Leader datanode id - * @param lifeCycleState - Pipeline State - * @param replicationType - Replication protocol - * @param replicationFactor - replication count on datanodes - * @param id - pipeline ID - */ - public Pipeline(String leaderID, HddsProtos.LifeCycleState lifeCycleState, - HddsProtos.ReplicationType replicationType, - HddsProtos.ReplicationFactor replicationFactor, PipelineID id) { - this.leaderID = leaderID; - this.lifeCycleState = lifeCycleState; - this.type = replicationType; - this.factor = replicationFactor; - this.id = id; - datanodes = new ConcurrentHashMap<>(); - } - - @Override - public int hashCode() { - return id.hashCode(); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - Pipeline that = (Pipeline) o; - - return id.equals(that.id) - && factor.equals(that.factor) - && type.equals(that.type) - && lifeCycleState.equals(that.lifeCycleState) - && leaderID.equals(that.leaderID); - - } - - /** - * Gets pipeline object from protobuf. - * - * @param pipelineProto - ProtoBuf definition for the pipeline. - * @return Pipeline Object - */ - public static Pipeline getFromProtoBuf( - HddsProtos.Pipeline pipelineProto) { - Preconditions.checkNotNull(pipelineProto); - Pipeline pipeline = - new Pipeline(pipelineProto.getLeaderID(), - pipelineProto.getState(), - pipelineProto.getType(), - pipelineProto.getFactor(), - PipelineID.getFromProtobuf(pipelineProto.getId())); - - for (HddsProtos.DatanodeDetailsProto dataID : - pipelineProto.getMembersList()) { - pipeline.addMember(DatanodeDetails.getFromProtoBuf(dataID)); - } - return pipeline; - } - - /** - * returns the replication count. - * @return Replication Factor - */ - public HddsProtos.ReplicationFactor getFactor() { - return factor; - } - - /** - * Returns the first machine in the set of datanodes. - * - * @return First Machine. - */ - @JsonIgnore - public DatanodeDetails getLeader() { - return getDatanodes().get(leaderID); - } - - /** - * Adds a datanode to pipeline - * @param datanodeDetails datanode to be added. - * @return true if the dn was not earlier present, false otherwise - */ - public boolean addMember(DatanodeDetails datanodeDetails) { - return datanodes.put(datanodeDetails.getUuid().toString(), - datanodeDetails) == null; - - } - - public void resetPipeline() { - // reset datanodes in pipeline and learn about them through - // pipeline reports on SCM restart - datanodes.clear(); - } - - public Map<String, DatanodeDetails> getDatanodes() { - return datanodes; - } - - public boolean isEmpty() { - return datanodes.isEmpty(); - } - /** - * Returns the leader host. - * - * @return First Machine. - */ - public String getLeaderHost() { - return getDatanodes() - .get(leaderID).getHostName(); - } - - /** - * - * @return lead - */ - public String getLeaderID() { - return leaderID; - } - /** - * Returns all machines that make up this pipeline. - * - * @return List of Machines. - */ - @JsonIgnore - public List<DatanodeDetails> getMachines() { - return new ArrayList<>(getDatanodes().values()); - } - - /** - * Returns all machines that make up this pipeline. - * - * @return List of Machines. - */ - public List<String> getDatanodeHosts() { - List<String> dataHosts = new ArrayList<>(); - for (DatanodeDetails datanode : getDatanodes().values()) { - dataHosts.add(datanode.getHostName()); - } - return dataHosts; - } - - /** - * Return a Protobuf Pipeline message from pipeline. - * - * @return Protobuf message - */ - @JsonIgnore - public HddsProtos.Pipeline getProtobufMessage() { - HddsProtos.Pipeline.Builder builder = - HddsProtos.Pipeline.newBuilder(); - for (DatanodeDetails datanode : datanodes.values()) { - builder.addMembers(datanode.getProtoBufMessage()); - } - builder.setLeaderID(leaderID); - - if (lifeCycleState != null) { - builder.setState(lifeCycleState); - } - if (type != null) { - builder.setType(type); - } - - if (factor != null) { - builder.setFactor(factor); - } - - if (id != null) { - builder.setId(id.getProtobuf()); - } - return builder.build(); - } - - /** - * Gets the State of the pipeline. - * - * @return - LifeCycleStates. - */ - public HddsProtos.LifeCycleState getLifeCycleState() { - return lifeCycleState; - } - - /** - * Update the State of the pipeline. - */ - public void setLifeCycleState(HddsProtos.LifeCycleState nextState) { - lifeCycleState = nextState; - } - - /** - * Gets the pipeline id. - * - * @return - Id of the pipeline - */ - public PipelineID getId() { - return id; - } - - /** - * Returns the type. - * - * @return type - Standalone, Ratis, Chained. - */ - public HddsProtos.ReplicationType getType() { - return type; - } - - @Override - public String toString() { - final StringBuilder b = new StringBuilder(getClass().getSimpleName()) - .append("["); - getDatanodes().keySet().forEach( - node -> b.append(node.endsWith(getLeaderID()) ? "*" + id : id)); - b.append(" id:").append(id); - if (getType() != null) { - b.append(" type:").append(getType().toString()); - } - if (getFactor() != null) { - b.append(" factor:").append(getFactor().toString()); - } - if (getLifeCycleState() != null) { - b.append(" State:").append(getLifeCycleState().toString()); - } - return b.toString(); - } - - public void setType(HddsProtos.ReplicationType type) { - this.type = type; - } - - /** - * Returns a JSON string of this object. - * - * @return String - json string - * @throws IOException - */ - public String toJsonString() throws IOException { - return WRITER.writeValueAsString(this); - } - - @JsonFilter(PIPELINE_INFO) - class MixIn { - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineID.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineID.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineID.java deleted file mode 100644 index 6e27a71..0000000 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineID.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.scm.container.common.helpers; - -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.ratis.protocol.RaftGroupId; - -import java.util.UUID; - -/** - * ID for the pipeline, the ID is based on UUID so that it can be used - * in Ratis as RaftGroupId, GroupID is used by the datanodes to initialize - * the ratis group they are part of. - */ -public final class PipelineID implements Comparable<PipelineID> { - - private UUID id; - private RaftGroupId groupId; - - private PipelineID(UUID id) { - this.id = id; - this.groupId = RaftGroupId.valueOf(id); - } - - public static PipelineID randomId() { - return new PipelineID(UUID.randomUUID()); - } - - public static PipelineID valueOf(UUID id) { - return new PipelineID(id); - } - - public static PipelineID valueOf(RaftGroupId groupId) { - return valueOf(groupId.getUuid()); - } - - public RaftGroupId getRaftGroupID() { - return groupId; - } - - public UUID getId() { - return id; - } - - public HddsProtos.PipelineID getProtobuf() { - return HddsProtos.PipelineID.newBuilder().setId(id.toString()).build(); - } - - public static PipelineID getFromProtobuf(HddsProtos.PipelineID protos) { - return new PipelineID(UUID.fromString(protos.getId())); - } - - @Override - public String toString() { - return "pipelineId=" + id; - } - - @Override - public int compareTo(PipelineID o) { - return this.id.compareTo(o.id); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - PipelineID that = (PipelineID) o; - - return id.equals(that.id); - } - - @Override - public int hashCode() { - return id.hashCode(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java index b22a0c6..ef055a1 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java @@ -60,7 +60,7 @@ public final class Pipeline { * * @return PipelineID */ - public PipelineID getID() { + public PipelineID getId() { return id; } @@ -87,11 +87,26 @@ public final class Pipeline { * * @return - LifeCycleStates. */ - PipelineState getPipelineState() { - // TODO: See if we need to expose this. + public PipelineState getPipelineState() { return state; } + /** + * Returns the list of nodes which form this pipeline. + * + * @return List of DatanodeDetails + */ + public List<DatanodeDetails> getNodes() { + return new ArrayList<>(nodeStatus.keySet()); + } + + public DatanodeDetails getFirstNode() throws IOException { + if (nodeStatus.isEmpty()) { + throw new IOException(String.format("Pipeline=%s is empty", id)); + } + return nodeStatus.keySet().iterator().next(); + } + public boolean isClosed() { return state == PipelineState.CLOSED; } @@ -117,13 +132,8 @@ public final class Pipeline { return true; } - /** - * Returns the list of nodes which form this pipeline. - * - * @return List of DatanodeDetails - */ - public List<DatanodeDetails> getNodes() { - return new ArrayList<>(nodeStatus.keySet()); + public boolean isEmpty() { + return nodeStatus.isEmpty(); } public HddsProtos.Pipeline getProtobufMessage() { @@ -138,7 +148,7 @@ public final class Pipeline { return builder.build(); } - public static Pipeline fromProtobuf(HddsProtos.Pipeline pipeline) { + public static Pipeline getFromProtobuf(HddsProtos.Pipeline pipeline) { return new Builder().setId(PipelineID.getFromProtobuf(pipeline.getId())) .setFactor(pipeline.getFactor()) .setType(pipeline.getType()) @@ -164,8 +174,7 @@ public final class Pipeline { .append(id, that.id) .append(type, that.type) .append(factor, that.factor) - .append(state, that.state) - .append(nodeStatus, that.nodeStatus) + .append(getNodes(), that.getNodes()) .isEquals(); } @@ -175,7 +184,6 @@ public final class Pipeline { .append(id) .append(type) .append(factor) - .append(state) .append(nodeStatus) .toHashCode(); } @@ -244,7 +252,10 @@ public final class Pipeline { } } - enum PipelineState { + /** + * Possible Pipeline states in SCM. + */ + public enum PipelineState { ALLOCATED, OPEN, CLOSED } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineNotFoundException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineNotFoundException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineNotFoundException.java new file mode 100644 index 0000000..4568379 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineNotFoundException.java @@ -0,0 +1,28 @@ +package org.apache.hadoop.hdds.scm.pipeline; + +import java.io.IOException; + +/** + * Signals that a pipeline is missing from PipelineManager. + */ +public class PipelineNotFoundException extends IOException{ + /** + * Constructs an {@code PipelineNotFoundException} with {@code null} + * as its error detail message. + */ + public PipelineNotFoundException() { + super(); + } + + /** + * Constructs an {@code PipelineNotFoundException} with the specified + * detail message. + * + * @param message + * The detail message (which is saved for later retrieval + * by the {@link #getMessage()} method) + */ + public PipelineNotFoundException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java index 712fb7e..82dfe16 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java @@ -20,7 +20,7 @@ package org.apache.hadoop.hdds.scm.protocol; import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.ContainerInfo; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto; http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java index aed0fb7..e684ae3 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java @@ -23,7 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos @@ -103,7 +103,7 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB } AllocatedBlock.Builder builder = new AllocatedBlock.Builder() .setBlockID(BlockID.getFromProtobuf(response.getBlockID())) - .setPipeline(Pipeline.getFromProtoBuf(response.getPipeline())) + .setPipeline(Pipeline.getFromProtobuf(response.getPipeline())) .setShouldCreateContainer(response.getCreateContainer()); return builder.build(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java index 8e723e6..d19efc1 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java @@ -29,7 +29,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.ContainerInfo; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto @@ -292,7 +292,7 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB PipelineResponseProto.Error.success) { Preconditions.checkState(response.hasPipeline(), "With success, " + "must come a pipeline"); - return Pipeline.getFromProtoBuf(response.getPipeline()); + return Pipeline.getFromProtobuf(response.getPipeline()); } else { String errorMessage = String.format("create replication pipeline " + "failed. code : %s Message: %s", response.getErrorCode(), http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 9bf0241..df1467b 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -92,7 +92,7 @@ public final class ContainerProtocolCalls { .newBuilder() .setBlockID(datanodeBlockID) .setBlockCommitSequenceId(blockCommitSequenceId); - String id = xceiverClient.getPipeline().getLeader().getUuidString(); + String id = xceiverClient.getPipeline().getFirstNode().getUuidString(); ContainerCommandRequestProto request = ContainerCommandRequestProto .newBuilder() @@ -125,7 +125,7 @@ public final class ContainerProtocolCalls { getBlockLengthRequestBuilder = ContainerProtos.GetCommittedBlockLengthRequestProto.newBuilder(). setBlockID(blockID.getDatanodeBlockIDProtobuf()); - String id = xceiverClient.getPipeline().getLeader().getUuidString(); + String id = xceiverClient.getPipeline().getFirstNode().getUuidString(); ContainerCommandRequestProto request = ContainerCommandRequestProto.newBuilder() .setCmdType(Type.GetCommittedBlockLength) @@ -152,7 +152,7 @@ public final class ContainerProtocolCalls { String traceID) throws IOException { PutBlockRequestProto.Builder createBlockRequest = PutBlockRequestProto.newBuilder().setBlockData(containerBlockData); - String id = xceiverClient.getPipeline().getLeader().getUuidString(); + String id = xceiverClient.getPipeline().getFirstNode().getUuidString(); ContainerCommandRequestProto request = ContainerCommandRequestProto.newBuilder().setCmdType(Type.PutBlock) .setContainerID(containerBlockData.getBlockID().getContainerID()) @@ -179,7 +179,7 @@ public final class ContainerProtocolCalls { .newBuilder() .setBlockID(blockID.getDatanodeBlockIDProtobuf()) .setChunkData(chunk); - String id = xceiverClient.getPipeline().getLeader().getUuidString(); + String id = xceiverClient.getPipeline().getFirstNode().getUuidString(); ContainerCommandRequestProto request = ContainerCommandRequestProto .newBuilder() .setCmdType(Type.ReadChunk) @@ -211,7 +211,7 @@ public final class ContainerProtocolCalls { .setBlockID(blockID.getDatanodeBlockIDProtobuf()) .setChunkData(chunk) .setData(data); - String id = xceiverClient.getPipeline().getLeader().getUuidString(); + String id = xceiverClient.getPipeline().getFirstNode().getUuidString(); ContainerCommandRequestProto request = ContainerCommandRequestProto .newBuilder() .setCmdType(Type.WriteChunk) @@ -260,7 +260,7 @@ public final class ContainerProtocolCalls { .setBlock(createBlockRequest).setData(ByteString.copyFrom(data)) .build(); - String id = client.getPipeline().getLeader().getUuidString(); + String id = client.getPipeline().getFirstNode().getUuidString(); ContainerCommandRequestProto request = ContainerCommandRequestProto.newBuilder() .setCmdType(Type.PutSmallFile) @@ -288,7 +288,7 @@ public final class ContainerProtocolCalls { createRequest.setContainerType(ContainerProtos.ContainerType .KeyValueContainer); - String id = client.getPipeline().getLeader().getUuidString(); + String id = client.getPipeline().getFirstNode().getUuidString(); ContainerCommandRequestProto.Builder request = ContainerCommandRequestProto.newBuilder(); request.setCmdType(ContainerProtos.Type.CreateContainer); @@ -314,7 +314,7 @@ public final class ContainerProtocolCalls { ContainerProtos.DeleteContainerRequestProto.Builder deleteRequest = ContainerProtos.DeleteContainerRequestProto.newBuilder(); deleteRequest.setForceDelete(force); - String id = client.getPipeline().getLeader().getUuidString(); + String id = client.getPipeline().getFirstNode().getUuidString(); ContainerCommandRequestProto.Builder request = ContainerCommandRequestProto.newBuilder(); @@ -338,7 +338,7 @@ public final class ContainerProtocolCalls { */ public static void closeContainer(XceiverClientSpi client, long containerID, String traceID) throws IOException { - String id = client.getPipeline().getLeader().getUuidString(); + String id = client.getPipeline().getFirstNode().getUuidString(); ContainerCommandRequestProto.Builder request = ContainerCommandRequestProto.newBuilder(); @@ -362,7 +362,7 @@ public final class ContainerProtocolCalls { public static ReadContainerResponseProto readContainer( XceiverClientSpi client, long containerID, String traceID) throws IOException { - String id = client.getPipeline().getLeader().getUuidString(); + String id = client.getPipeline().getFirstNode().getUuidString(); ContainerCommandRequestProto.Builder request = ContainerCommandRequestProto.newBuilder(); @@ -396,7 +396,7 @@ public final class ContainerProtocolCalls { GetSmallFileRequestProto .newBuilder().setBlock(getBlock) .build(); - String id = client.getPipeline().getLeader().getUuidString(); + String id = client.getPipeline().getFirstNode().getUuidString(); ContainerCommandRequestProto request = ContainerCommandRequestProto .newBuilder() http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java index 2dbe2e6..1ff7695 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java +++ b/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java @@ -19,7 +19,7 @@ package org.apache.ratis; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.ratis.client.RaftClient; @@ -40,6 +40,7 @@ import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -88,7 +89,7 @@ public interface RatisHelper { } static List<RaftPeer> toRaftPeers(Pipeline pipeline) { - return toRaftPeers(pipeline.getMachines()); + return toRaftPeers(pipeline.getNodes()); } static <E extends DatanodeDetails> List<RaftPeer> toRaftPeers( @@ -125,15 +126,15 @@ public interface RatisHelper { } static RaftGroup newRaftGroup(Pipeline pipeline) { - return RaftGroup.valueOf(pipeline.getId().getRaftGroupID(), + return RaftGroup.valueOf(RaftGroupId.valueOf(pipeline.getId().getId()), toRaftPeers(pipeline)); } static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline, - RetryPolicy retryPolicy) { - return newRaftClient(rpcType, toRaftPeerId(pipeline.getLeader()), - newRaftGroup(pipeline.getId().getRaftGroupID(), pipeline.getMachines()), - retryPolicy); + RetryPolicy retryPolicy) throws IOException { + return newRaftClient(rpcType, toRaftPeerId(pipeline.getFirstNode()), + newRaftGroup(RaftGroupId.valueOf(pipeline.getId().getId()), + pipeline.getNodes()), retryPolicy); } static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader, http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java index 8ebfe49..ab9f42f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java @@ -27,7 +27,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReport; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.container.common.helpers. StorageContainerException; import org.apache.hadoop.ozone.OzoneConfigKeys; http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index f0c2845..b5092d9 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -31,7 +31,7 @@ import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineAction; import org.apache.hadoop.hdds.scm.HddsServerUtil; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; @@ -372,7 +372,7 @@ public final class XceiverServerRatis implements XceiverServerSpi { ContainerCommandRequestProto request, HddsProtos.PipelineID pipelineID, RaftClientRequest.Type type) { return new RaftClientRequest(clientId, server.getId(), - PipelineID.getFromProtobuf(pipelineID).getRaftGroupID(), + RaftGroupId.valueOf(PipelineID.getFromProtobuf(pipelineID).getId()), nextCallId(), 0, Message.valueOf(request.toByteString()), type); } @@ -405,7 +405,7 @@ public final class XceiverServerRatis implements XceiverServerSpi { + roleInfoProto.getRole()); } - PipelineID pipelineID = PipelineID.valueOf(groupId); + PipelineID pipelineID = PipelineID.valueOf(groupId.getUuid()); ClosePipelineInfo.Builder closePipelineInfo = ClosePipelineInfo.newBuilder() .setPipelineID(pipelineID.getProtobuf()) @@ -429,8 +429,8 @@ public final class XceiverServerRatis implements XceiverServerSpi { List<PipelineReport> reports = new ArrayList<>(); for (RaftGroupId groupId : gids) { reports.add(PipelineReport.newBuilder() - .setPipelineID(PipelineID.valueOf(groupId).getProtobuf()) - .build()); + .setPipelineID(PipelineID.valueOf(groupId.getUuid()).getProtobuf()) + .build()); } return reports; } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java index c2c20a4..7849bcd 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java @@ -22,7 +22,7 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; /** * Asks datanode to close a container. http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java index 049aa3f..681d021 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java @@ -305,7 +305,7 @@ public class BlockManagerImpl implements EventHandler<Boolean>, private AllocatedBlock newBlock(ContainerWithPipeline containerWithPipeline, HddsProtos.LifeCycleState state) throws IOException { ContainerInfo containerInfo = containerWithPipeline.getContainerInfo(); - if (containerWithPipeline.getPipeline().getDatanodes().size() == 0) { + if (containerWithPipeline.getPipeline().getNodes().size() == 0) { LOG.error("Pipeline Machine count is zero."); return null; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java index 5c112a0..70e9b5d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java @@ -31,7 +31,7 @@ import java.util.UUID; import java.util.stream.Collectors; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; /** * A wrapper class to hold info about datanode and all deleted block @@ -74,7 +74,7 @@ public class DatanodeDeletedBlockTransactions { } boolean success = false; - for (DatanodeDetails dd : pipeline.getMachines()) { + for (DatanodeDetails dd : pipeline.getNodes()) { UUID dnID = dd.getUuid(); if (dnsWithTransactionCommitted == null || !dnsWithTransactionCommitted.contains(dnID)) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java index 51790be..2a8a3e3 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java @@ -31,7 +31,7 @@ import org.apache.hadoop.hdds.scm.command .CommandStatusReportHandler.DeleteBlockStatus; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerManager; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdfs.DFSUtil; @@ -261,8 +261,7 @@ public class DeletedBlockLogImpl Pipeline pipeline = containerManager.getContainerWithPipeline( ContainerID.valueof(containerId)).getPipeline(); - Collection<DatanodeDetails> containerDnsDetails = - pipeline.getDatanodes().values(); + Collection<DatanodeDetails> containerDnsDetails = pipeline.getNodes(); // The delete entry can be safely removed from the log if all the // corresponding nodes commit the txn. It is required to check that // the nodes returned in the pipeline match the replication factor. http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java index 74edbc2..69574a9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java @@ -19,7 +19,7 @@ package org.apache.hadoop.hdds.scm.container; import java.io.IOException; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload; @@ -123,7 +123,7 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> { info.getReplicationType(), info.getPipelineID()); Pipeline pipeline = containerWithPipeline.getPipeline(); - pipeline.getMachines().stream() + pipeline.getNodes().stream() .map(node -> new CommandForDatanode<>(node.getUuid(), closeContainerCommand)) .forEach(command -> publisher.fireEvent(DATANODE_COMMAND, command)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java index 5dba8fd..0a48915 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java @@ -24,7 +24,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; +import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; import java.io.Closeable; import java.io.IOException; @@ -73,7 +73,7 @@ public interface ContainerManager extends Closeable { * @throws IOException */ ContainerWithPipeline getContainerWithPipeline(ContainerID containerID) - throws ContainerNotFoundException; + throws ContainerNotFoundException, PipelineNotFoundException; /** * Returns containers under certain conditions. @@ -175,6 +175,4 @@ public interface ContainerManager extends Closeable { ContainerWithPipeline getMatchingContainerWithPipeline(long size, String owner, ReplicationType type, ReplicationFactor factor, LifeCycleState state) throws IOException; - - PipelineSelector getPipelineSelector(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java index 42b39f9..87505c3 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java @@ -22,11 +22,11 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.container.states.ContainerState; import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap; import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; +import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; @@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.NavigableSet; import java.util.Set; @@ -232,19 +233,28 @@ public class ContainerStateManager { /** * Allocates a new container based on the type, replication etc. * - * @param selector -- Pipeline selector class. + * @param pipelineManager -- Pipeline Manager class. * @param type -- Replication type. * @param replicationFactor - Replication replicationFactor. * @return ContainerWithPipeline * @throws IOException on Failure. */ - ContainerInfo allocateContainer(final PipelineSelector selector, + ContainerInfo allocateContainer(final PipelineManager pipelineManager, final HddsProtos.ReplicationType type, final HddsProtos.ReplicationFactor replicationFactor, final String owner) throws IOException { - final Pipeline pipeline = selector.getReplicationPipeline(type, - replicationFactor); + Pipeline pipeline; + try { + pipeline = pipelineManager.createPipeline(type, replicationFactor); + } catch (IOException e) { + final List<Pipeline> pipelines = + pipelineManager.getPipelines(type, replicationFactor); + if (pipelines.isEmpty()) { + throw new IOException("Could not allocate container"); + } + pipeline = pipelines.get((int) containerCount.get() % pipelines.size()); + } Preconditions.checkNotNull(pipeline, "Pipeline type=%s/" + "replication=%s couldn't be found for the new container. " @@ -263,7 +273,8 @@ public class ContainerStateManager { .setReplicationFactor(replicationFactor) .setReplicationType(pipeline.getType()) .build(); - selector.addContainerToPipeline(pipeline.getId(), containerID); + pipelineManager.addContainerToPipeline(pipeline.getId(), + ContainerID.valueof(containerID)); Preconditions.checkNotNull(containerInfo); containers.addContainer(containerInfo); LOG.trace("New container allocated: {}", containerInfo); http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java index 96ad731..1666b7c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java @@ -26,13 +26,13 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeManager; -import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; +import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; @@ -86,7 +86,7 @@ public class SCMContainerManager implements ContainerManager { private final Lock lock; private final MetadataStore containerStore; - private final PipelineSelector pipelineSelector; + private final PipelineManager pipelineManager; private final ContainerStateManager containerStateManager; private final LeaseManager<ContainerInfo> containerLeaseManager; private final EventPublisher eventPublisher; @@ -102,12 +102,13 @@ public class SCMContainerManager implements ContainerManager { * passed to LevelDB and this memory is allocated in Native code space. * CacheSize is specified * in MB. + * @param pipelineManager - PipelineManager * @throws IOException on Failure. */ @SuppressWarnings("unchecked") public SCMContainerManager(final Configuration conf, - final NodeManager nodeManager, final EventPublisher eventPublisher) - throws IOException { + final NodeManager nodeManager, PipelineManager pipelineManager, + final EventPublisher eventPublisher) throws IOException { final File metaDir = getOzoneMetaDirPath(conf); final File containerDBPath = new File(metaDir, SCM_CONTAINER_DB); @@ -123,8 +124,7 @@ public class SCMContainerManager implements ContainerManager { this.lock = new ReentrantLock(); this.size = (long) conf.getStorageSize(OZONE_SCM_CONTAINER_SIZE, OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); - this.pipelineSelector = new PipelineSelector(nodeManager, - conf, eventPublisher, cacheSize); + this.pipelineManager = pipelineManager; this.containerStateManager = new ContainerStateManager(conf); this.eventPublisher = eventPublisher; @@ -147,8 +147,10 @@ public class SCMContainerManager implements ContainerManager { HddsProtos.SCMContainerInfo.PARSER.parseFrom(entry.getValue())); Preconditions.checkNotNull(container); containerStateManager.loadContainer(container); - pipelineSelector.addContainerToPipeline( - container.getPipelineID(), container.getContainerID()); + if (container.isOpen()) { + pipelineManager.addContainerToPipeline(container.getPipelineID(), + ContainerID.valueof(container.getContainerID())); + } } } @@ -214,28 +216,23 @@ public class SCMContainerManager implements ContainerManager { */ @Override public ContainerWithPipeline getContainerWithPipeline(ContainerID containerID) - throws ContainerNotFoundException { + throws ContainerNotFoundException, PipelineNotFoundException { lock.lock(); try { final ContainerInfo contInfo = getContainer(containerID); Pipeline pipeline; - String leaderId = ""; if (contInfo.isOpen()) { // If pipeline with given pipeline Id already exist return it - pipeline = pipelineSelector.getPipeline(contInfo.getPipelineID()); + pipeline = pipelineManager.getPipeline(contInfo.getPipelineID()); } else { // For close containers create pipeline from datanodes with replicas Set<ContainerReplica> dnWithReplicas = containerStateManager .getContainerReplicas(contInfo.containerID()); - if (!dnWithReplicas.isEmpty()) { - leaderId = dnWithReplicas.iterator().next() - .getDatanodeDetails().getUuidString(); - } - pipeline = new Pipeline(leaderId, contInfo.getState(), - ReplicationType.STAND_ALONE, contInfo.getReplicationFactor(), - PipelineID.randomId()); - dnWithReplicas.stream().map(ContainerReplica::getDatanodeDetails). - forEach(pipeline::addMember); + List<DatanodeDetails> dns = + dnWithReplicas.stream().map(ContainerReplica::getDatanodeDetails) + .collect(Collectors.toList()); + pipeline = pipelineManager.createPipeline(ReplicationType.STAND_ALONE, + contInfo.getReplicationFactor(), dns); } return new ContainerWithPipeline(contInfo, pipeline); } finally { @@ -290,8 +287,8 @@ public class SCMContainerManager implements ContainerManager { lock.lock(); try { final ContainerInfo containerInfo; containerInfo = containerStateManager - .allocateContainer(pipelineSelector, type, replicationFactor, owner); - final Pipeline pipeline = pipelineSelector.getPipeline( + .allocateContainer(pipelineManager, type, replicationFactor, owner); + final Pipeline pipeline = pipelineManager.getPipeline( containerInfo.getPipelineID()); try { @@ -360,8 +357,8 @@ public class SCMContainerManager implements ContainerManager { ContainerInfo updatedContainer = updateContainerStateInternal(containerID, event); if (!updatedContainer.isOpen()) { - pipelineSelector.removeContainerFromPipeline( - updatedContainer.getPipelineID(), containerID.getId()); + pipelineManager.removeContainerFromPipeline( + updatedContainer.getPipelineID(), containerID); } final byte[] dbKey = Longs.toByteArray(containerID.getId()); containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray()); @@ -485,7 +482,7 @@ public class SCMContainerManager implements ContainerManager { if (containerInfo == null) { return null; } - Pipeline pipeline = pipelineSelector + Pipeline pipeline = pipelineManager .getPipeline(containerInfo.getPipelineID()); return new ContainerWithPipeline(containerInfo, pipeline); } @@ -647,13 +644,5 @@ public class SCMContainerManager implements ContainerManager { if (containerStore != null) { containerStore.close(); } - - if (pipelineSelector != null) { - pipelineSelector.shutdown(); - } - } - - public PipelineSelector getPipelineSelector() { - return pipelineSelector; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java index 77b8713..30a7c34 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java @@ -29,7 +29,6 @@ import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler .CloseContainerRetryableReq; import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher .PipelineReportFromDatanode; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher @@ -103,14 +102,6 @@ public final class SCMEvents { "Pipeline_Actions"); /** - * Pipeline close event are triggered to close pipeline because of failure, - * stale node, decommissioning etc. - */ - public static final TypedEvent<PipelineID> - PIPELINE_CLOSE = new TypedEvent<>(PipelineID.class, - "Pipeline_Close"); - - /** * A Command status report will be sent by datanodes. This repoort is received * by SCMDatanodeHeartbeatDispatcher and CommandReport event is generated. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java index 5f6a2e4..d55ff98 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java @@ -19,8 +19,8 @@ package org.apache.hadoop.hdds.scm.node; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.exceptions.SCMException; http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java index 88f984b..588756c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java @@ -25,8 +25,8 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.scm.HddsServerUtil; import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; http://git-wip-us.apache.org/repos/asf/hadoop/blob/874e06e5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index acec6aa..35c22f3 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -22,8 +22,8 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org