HDDS-844. Add logic for pipeline teardown after timeout. Contributed by Lokesh Jain.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cfb915f3 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cfb915f3 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cfb915f3 Branch: refs/heads/HDFS-13891 Commit: cfb915f3df43c84d3fb36eb2a430a25946b2ddb7 Parents: be0708c Author: Mukul Kumar Singh <[email protected]> Authored: Mon Nov 19 12:11:05 2018 +0530 Committer: Mukul Kumar Singh <[email protected]> Committed: Mon Nov 19 12:11:05 2018 +0530 ---------------------------------------------------------------------- .../hadoop/hdds/scm/XceiverClientGrpc.java | 13 -- .../hadoop/hdds/scm/XceiverClientRatis.java | 48 ----- .../scm/client/ContainerOperationClient.java | 3 +- .../apache/hadoop/hdds/scm/ScmConfigKeys.java | 8 +- .../hadoop/hdds/scm/XceiverClientSpi.java | 11 -- .../common/src/main/resources/ozone-default.xml | 13 +- .../hadoop/hdds/scm/node/StaleNodeHandler.java | 14 +- .../scm/pipeline/PipelineActionHandler.java | 26 +-- .../scm/pipeline/PipelineReportHandler.java | 12 +- .../hdds/scm/pipeline/PipelineStateMap.java | 6 - .../scm/pipeline/RatisPipelineProvider.java | 10 +- .../hdds/scm/pipeline/RatisPipelineUtils.java | 176 +++++++++++++++++++ .../scm/server/StorageContainerManager.java | 4 +- .../org/apache/hadoop/hdds/scm/TestUtils.java | 30 +++- .../hdds/scm/pipeline/TestPipelineClose.java | 138 +++++++++++---- .../scm/pipeline/TestPipelineStateManager.java | 14 -- .../scm/pipeline/TestSCMPipelineManager.java | 4 +- .../freon/TestFreonWithPipelineDestroy.java | 109 ++++++++++++ 18 files changed, 464 insertions(+), 175 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfb915f3/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index bbd3340..c6b19ab 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -288,19 +288,6 @@ public class XceiverClientGrpc extends XceiverClientSpi { } } - /** - * Create a pipeline. - */ - @Override - public void createPipeline() { - // For stand alone pipeline, there is no notion called setup pipeline. - } - - @Override - public void destroyPipeline() { - // For stand alone pipeline, there is no notion called destroy pipeline. - } - @Override public void watchForCommit(long index, long timeout) throws InterruptedException, ExecutionException, TimeoutException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfb915f3/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index f6083ec..6b3b001 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hdds.scm; import org.apache.hadoop.hdds.HddsUtils; -import org.apache.hadoop.io.MultipleIOException; import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.thirdparty.com.google.protobuf @@ -27,7 +26,6 @@ import org.apache.ratis.thirdparty.com.google.protobuf import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos @@ -36,19 +34,14 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.ratis.RatisHelper; import org.apache.ratis.client.RaftClient; import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.RaftGroup; -import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; -import org.apache.ratis.util.CheckedBiConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import java.util.Objects; import java.util.Collection; import java.util.concurrent.CompletableFuture; @@ -97,22 +90,6 @@ public final class XceiverClientRatis extends XceiverClientSpi { this.retryPolicy = retryPolicy; } - @Override - public void createPipeline() throws IOException { - final RaftGroup group = RatisHelper.newRaftGroup(pipeline); - LOG.debug("creating pipeline:{} with {}", pipeline.getId(), group); - callRatisRpc(pipeline.getNodes(), - (raftClient, peer) -> raftClient.groupAdd(group, peer.getId())); - } - - @Override - public void destroyPipeline() throws IOException { - final RaftGroup group = RatisHelper.newRaftGroup(pipeline); - LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group); - callRatisRpc(pipeline.getNodes(), (raftClient, peer) -> raftClient - .groupRemove(group.getGroupId(), true, peer.getId())); - } - /** * Returns Ratis as pipeline Type. * @@ -123,31 +100,6 @@ public final class XceiverClientRatis extends XceiverClientSpi { return HddsProtos.ReplicationType.RATIS; } - private void callRatisRpc(List<DatanodeDetails> datanodes, - CheckedBiConsumer<RaftClient, RaftPeer, IOException> rpc) - throws IOException { - if (datanodes.isEmpty()) { - return; - } - - final List<IOException> exceptions = - Collections.synchronizedList(new ArrayList<>()); - datanodes.parallelStream().forEach(d -> { - final RaftPeer p = RatisHelper.toRaftPeer(d); - try (RaftClient client = RatisHelper - .newRaftClient(rpcType, p, retryPolicy)) { - rpc.accept(client, p); - } catch (IOException ioe) { - exceptions.add( - new IOException("Failed invoke Ratis rpc " + rpc + " for " + d, - ioe)); - } - }); - if (!exceptions.isEmpty()) { - throw MultipleIOException.createIOException(exceptions); - } - } - @Override public Pipeline getPipeline() { return pipeline; http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfb915f3/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java index 8e9f4ad..ef72e38 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java @@ -157,7 +157,8 @@ public class ContainerOperationClient implements ScmClient { // ObjectStageChangeRequestProto.Op.create, // ObjectStageChangeRequestProto.Stage.begin); - client.createPipeline(); + // client.createPipeline(); + // TODO: Use PipelineManager to createPipeline //storageContainerLocationClient.notifyObjectStageChange( // ObjectStageChangeRequestProto.Type.pipeline, http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfb915f3/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 595135d..896caed 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -289,11 +289,11 @@ public final class ScmConfigKeys { public static final String OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT = "60s"; - public static final String OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT = - "ozone.scm.pipeline.creation.lease.timeout"; + public static final String OZONE_SCM_PIPELINE_DESTROY_TIMEOUT = + "ozone.scm.pipeline.destroy.timeout"; - public static final String - OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT = "60s"; + public static final String OZONE_SCM_PIPELINE_DESTROY_TIMEOUT_DEFAULT = + "300s"; public static final String OZONE_SCM_BLOCK_DELETION_MAX_RETRY = "ozone.scm.block.deletion.max.retry"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfb915f3/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java index 9eb49ae..7000660 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java @@ -119,17 +119,6 @@ public abstract class XceiverClientSpi implements Closeable { throws IOException, ExecutionException, InterruptedException; /** - * Create a pipeline. - */ - public abstract void createPipeline() throws IOException; - - /** - * Destroy a pipeline. - * @throws IOException - */ - public abstract void destroyPipeline() throws IOException; - - /** * Returns pipeline Type. * * @return - {Stand_Alone, Ratis or Chained} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfb915f3/hadoop-hdds/common/src/main/resources/ozone-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index b31e490..865335c 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -821,6 +821,8 @@ OM/SCM eventually. So a 30 second HB seems to work. This assumes that replication strategy used is Ratis if not, this value should be set to something smaller like 3 seconds. + ozone.scm.pipeline.close.timeout should also be adjusted accordingly, + if the default value for this config is not used. </description> </property> <property> @@ -1183,15 +1185,12 @@ postfix (ns,ms,s,m,h,d)</description> </property> <property> - <name>ozone.scm.pipeline.creation.lease.timeout</name> - <value>60s</value> + <name>ozone.scm.pipeline.destroy.timeout</name> + <value>300s</value> <tag>OZONE, SCM, PIPELINE</tag> <description> - Pipeline creation timeout in milliseconds to be used by SCM. When - BEGIN_CREATE event happens the pipeline is moved from ALLOCATED to - CREATING state, SCM will now wait for the configured amount of time - to get COMPLETE_CREATE event if it doesn't receive it will move the - pipeline to DELETING. + Once a pipeline is closed, SCM should wait for the above configured time + before destroying a pipeline. </description> </property> http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfb915f3/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java index 268fe5b..4055449 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java @@ -18,9 +18,13 @@ package org.apache.hadoop.hdds.scm.node; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.slf4j.Logger; @@ -38,21 +42,25 @@ public class StaleNodeHandler implements EventHandler<DatanodeDetails> { private final NodeManager nodeManager; private final PipelineManager pipelineManager; + private final Configuration conf; public StaleNodeHandler(NodeManager nodeManager, - PipelineManager pipelineManager) { + PipelineManager pipelineManager, OzoneConfiguration conf) { this.nodeManager = nodeManager; this.pipelineManager = pipelineManager; + this.conf = conf; } @Override public void onMessage(DatanodeDetails datanodeDetails, - EventPublisher publisher) { + EventPublisher publisher) { Set<PipelineID> pipelineIds = nodeManager.getPipelines(datanodeDetails); for (PipelineID pipelineID : pipelineIds) { try { - pipelineManager.finalizePipeline(pipelineID); + Pipeline pipeline = pipelineManager.getPipeline(pipelineID); + RatisPipelineUtils + .finalizeAndDestroyPipeline(pipelineManager, pipeline, conf, true); } catch (IOException e) { LOG.info("Could not finalize pipeline={} for dn={}", pipelineID, datanodeDetails); http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfb915f3/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java index a44ce9d..c467b9e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java @@ -17,10 +17,10 @@ package org.apache.hadoop.hdds.scm.pipeline; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineAction; -import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher - .PipelineActionsFromDatanode; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; @@ -32,16 +32,19 @@ import java.io.IOException; /** * Handles pipeline actions from datanode. */ -public class PipelineActionHandler implements - EventHandler<PipelineActionsFromDatanode> { +public class PipelineActionHandler + implements EventHandler<PipelineActionsFromDatanode> { - public static final Logger LOG = LoggerFactory.getLogger( - PipelineActionHandler.class); + public static final Logger LOG = + LoggerFactory.getLogger(PipelineActionHandler.class); private final PipelineManager pipelineManager; + private final Configuration ozoneConf; - public PipelineActionHandler(PipelineManager pipelineManager) { + public PipelineActionHandler(PipelineManager pipelineManager, + OzoneConfiguration conf) { this.pipelineManager = pipelineManager; + this.ozoneConf = conf; } @Override @@ -53,7 +56,10 @@ public class PipelineActionHandler implements try { pipelineID = PipelineID. getFromProtobuf(action.getClosePipeline().getPipelineID()); - pipelineManager.finalizePipeline(pipelineID); + Pipeline pipeline = pipelineManager.getPipeline(pipelineID); + RatisPipelineUtils + .finalizeAndDestroyPipeline(pipelineManager, pipeline, ozoneConf, + true); } catch (IOException ioe) { LOG.error("Could not execute pipeline action={} pipeline={} {}", action, pipelineID, ioe); http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfb915f3/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java index 6c31a12..2d4bae1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java @@ -25,7 +25,6 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReport; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; -import org.apache.hadoop.hdds.scm.XceiverClientRatis; import org.apache.hadoop.hdds.scm.server .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventHandler; @@ -76,11 +75,11 @@ public class PipelineReportHandler implements private void processPipelineReport(PipelineReport report, DatanodeDetails dn) throws IOException { PipelineID pipelineID = PipelineID.getFromProtobuf(report.getPipelineID()); - Pipeline pipeline = null; + Pipeline pipeline; try { pipeline = pipelineManager.getPipeline(pipelineID); } catch (PipelineNotFoundException e) { - //TODO: introduce per datanode command for pipeline destroy + RatisPipelineUtils.destroyPipeline(dn, pipelineID, conf); return; } @@ -93,14 +92,9 @@ public class PipelineReportHandler implements } else if (pipeline.isClosed()) { int numContainers = pipelineManager.getNumberOfContainers(pipelineID); if (numContainers == 0) { - // remove the pipeline from the pipeline manager - pipelineManager.removePipeline(pipelineID); // since all the containers have been closed the pipeline can be // destroyed - try (XceiverClientRatis client = - XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) { - client.destroyPipeline(); - } + RatisPipelineUtils.destroyPipeline(pipelineManager, pipeline, conf); } } else { // In OPEN state case just report the datanode http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfb915f3/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java index 85790b2..20dfa03 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java @@ -242,12 +242,6 @@ class PipelineStateMap { String.format("Pipeline with %s is not yet closed", pipelineID)); } - Set<ContainerID> containerIDs = pipeline2container.get(pipelineID); - if (containerIDs.size() != 0) { - throw new IOException( - String.format("Pipeline with %s is not empty", pipelineID)); - } - pipelineMap.remove(pipelineID); pipeline2container.remove(pipelineID); return pipeline; http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfb915f3/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index 590cd27..3a5bd41 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -24,7 +24,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdds.scm.XceiverClientRatis; import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy; import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom; import org.apache.hadoop.hdds.scm.node.NodeManager; @@ -133,12 +132,7 @@ public class RatisPipelineProvider implements PipelineProvider { .build(); } - private void initializePipeline(Pipeline pipeline) - throws IOException { - // TODO: remove old code in XceiverClientRatis#newXceiverClientRatis - try (XceiverClientRatis client = - XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) { - client.createPipeline(); - } + private void initializePipeline(Pipeline pipeline) throws IOException { + RatisPipelineUtils.createPipeline(pipeline, conf); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfb915f3/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java new file mode 100644 index 0000000..dd79962 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.pipeline; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.io.MultipleIOException; +import org.apache.ratis.RatisHelper; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.retry.RetryPolicy; +import org.apache.ratis.rpc.SupportedRpcType; +import org.apache.ratis.util.CheckedBiConsumer; +import org.apache.ratis.util.TimeDuration; +import org.apache.ratis.util.TimeoutScheduler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Utility class for Ratis pipelines. Contains methods to create and destroy + * ratis pipelines. + */ +public final class RatisPipelineUtils { + + private static TimeoutScheduler timeoutScheduler = + TimeoutScheduler.newInstance(1); + + private static final Logger LOG = + LoggerFactory.getLogger(RatisPipelineUtils.class); + + private RatisPipelineUtils() { + } + + /** + * Sends ratis command to create pipeline on all the datanodes. + * @param pipeline - Pipeline to be created + * @param ozoneConf - Ozone Confinuration + * @throws IOException if creation fails + */ + public static void createPipeline(Pipeline pipeline, Configuration ozoneConf) + throws IOException { + final RaftGroup group = RatisHelper.newRaftGroup(pipeline); + LOG.debug("creating pipeline:{} with {}", pipeline.getId(), group); + callRatisRpc(pipeline.getNodes(), ozoneConf, + (raftClient, peer) -> raftClient.groupAdd(group, peer.getId())); + } + + /** + * Removes pipeline from SCM. Sends ratis command to destroy pipeline on all + * the datanodes. + * @param pipelineManager - SCM pipeline manager + * @param pipeline - Pipeline to be destroyed + * @param ozoneConf - Ozone configuration + * @throws IOException + */ + static void destroyPipeline(PipelineManager pipelineManager, + Pipeline pipeline, Configuration ozoneConf) throws IOException { + final RaftGroup group = RatisHelper.newRaftGroup(pipeline); + LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group); + // remove the pipeline from the pipeline manager + pipelineManager.removePipeline(pipeline.getId()); + for (DatanodeDetails dn : pipeline.getNodes()) { + destroyPipeline(dn, pipeline.getId(), ozoneConf); + } + } + + /** + * Finalizes pipeline in the SCM. Removes pipeline and sends ratis command to + * destroy pipeline on the datanodes immediately or after timeout based on the + * value of onTimeout parameter. + * @param pipelineManager - SCM pipeline manager + * @param pipeline - Pipeline to be destroyed + * @param ozoneConf - Ozone Configuration + * @param onTimeout - if true pipeline is removed and destroyed on datanodes + * after timeout + * @throws IOException + */ + public static void finalizeAndDestroyPipeline(PipelineManager pipelineManager, + Pipeline pipeline, Configuration ozoneConf, boolean onTimeout) + throws IOException { + final RaftGroup group = RatisHelper.newRaftGroup(pipeline); + LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group); + pipelineManager.finalizePipeline(pipeline.getId()); + if (onTimeout) { + long pipelineDestroyTimeoutInMillis = ozoneConf + .getTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, + ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + TimeDuration timeoutDuration = TimeDuration + .valueOf(pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS); + timeoutScheduler.onTimeout(timeoutDuration, + () -> destroyPipeline(pipelineManager, pipeline, ozoneConf), LOG, + () -> String.format("Destroy pipeline failed for pipeline:%s with %s", + pipeline.getId(), group)); + } else { + destroyPipeline(pipelineManager, pipeline, ozoneConf); + } + } + + /** + * Sends ratis command to destroy pipeline on the given datanode. + * @param dn - Datanode on which pipeline needs to be destroyed + * @param pipelineID - ID of pipeline to be destroyed + * @param ozoneConf - Ozone configuration + * @throws IOException + */ + static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID, + Configuration ozoneConf) throws IOException { + final String rpcType = ozoneConf + .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, + ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); + final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf); + final RaftPeer p = RatisHelper.toRaftPeer(dn); + RaftClient client = RatisHelper + .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p, + retryPolicy); + client + .groupRemove(RaftGroupId.valueOf(pipelineID.getId()), true, p.getId()); + } + + private static void callRatisRpc(List<DatanodeDetails> datanodes, + Configuration ozoneConf, + CheckedBiConsumer<RaftClient, RaftPeer, IOException> rpc) + throws IOException { + if (datanodes.isEmpty()) { + return; + } + + final String rpcType = ozoneConf + .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, + ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); + final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf); + final List<IOException> exceptions = + Collections.synchronizedList(new ArrayList<>()); + + datanodes.parallelStream().forEach(d -> { + final RaftPeer p = RatisHelper.toRaftPeer(d); + try (RaftClient client = RatisHelper + .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p, + retryPolicy)) { + rpc.accept(client, p); + } catch (IOException ioe) { + exceptions.add( + new IOException("Failed invoke Ratis rpc " + rpc + " for " + d, + ioe)); + } + }); + if (!exceptions.isEmpty()) { + throw MultipleIOException.createIOException(exceptions); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfb915f3/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 08ee382..fe00b54 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -223,7 +223,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl NewNodeHandler newNodeHandler = new NewNodeHandler(); StaleNodeHandler staleNodeHandler = - new StaleNodeHandler(scmNodeManager, pipelineManager); + new StaleNodeHandler(scmNodeManager, pipelineManager, conf); DeadNodeHandler deadNodeHandler = new DeadNodeHandler(scmNodeManager, containerManager); ContainerActionsHandler actionsHandler = new ContainerActionsHandler(); @@ -239,7 +239,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl pipelineManager, containerManager); PipelineActionHandler pipelineActionHandler = - new PipelineActionHandler(pipelineManager); + new PipelineActionHandler(pipelineManager, conf); long watcherTimeout = conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT, http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfb915f3/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java index 8fbe7fb..debde65 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java @@ -17,12 +17,17 @@ package org.apache.hadoop.hdds.scm; import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineActionsProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReport; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode; import org.apache.hadoop.hdds.scm.server .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; import org.apache.hadoop.hdds.scm.container.ContainerID; @@ -305,19 +310,34 @@ public final class TestUtils { return PipelineReportsProto.newBuilder().build(); } - public static PipelineReportFromDatanode getRandomPipelineReportFromDatanode( - DatanodeDetails dn, - org.apache.hadoop.hdds.scm.pipeline.PipelineID... pipelineIDs) { + public static PipelineReportFromDatanode getPipelineReportFromDatanode( + DatanodeDetails dn, PipelineID... pipelineIDs) { PipelineReportsProto.Builder reportBuilder = PipelineReportsProto.newBuilder(); - for (org.apache.hadoop.hdds.scm.pipeline.PipelineID pipelineID : - pipelineIDs) { + for (PipelineID pipelineID : pipelineIDs) { reportBuilder.addPipelineReport( PipelineReport.newBuilder().setPipelineID(pipelineID.getProtobuf())); } return new PipelineReportFromDatanode(dn, reportBuilder.build()); } + public static PipelineActionsFromDatanode getPipelineActionFromDatanode( + DatanodeDetails dn, PipelineID... pipelineIDs) { + PipelineActionsProto.Builder actionsProtoBuilder = + PipelineActionsProto.newBuilder(); + for (PipelineID pipelineID : pipelineIDs) { + ClosePipelineInfo closePipelineInfo = + ClosePipelineInfo.newBuilder().setPipelineID(pipelineID.getProtobuf()) + .setReason(ClosePipelineInfo.Reason.PIPELINE_FAILED) + .setDetailedReason("").build(); + actionsProtoBuilder.addPipelineActions(PipelineAction.newBuilder() + .setClosePipeline(closePipelineInfo) + .setAction(PipelineAction.Action.CLOSE) + .build()); + } + return new PipelineActionsFromDatanode(dn, actionsProtoBuilder.build()); + } + /** * Creates container report with the given ContainerInfo(s). * http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfb915f3/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java index 4f8943e..356f005 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java @@ -17,22 +17,32 @@ */ package org.apache.hadoop.hdds.scm.pipeline; +import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.junit.AfterClass; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.junit.After; import org.junit.Assert; -import org.junit.BeforeClass; +import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE; @@ -43,35 +53,36 @@ import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.R */ public class TestPipelineClose { - private static MiniOzoneCluster cluster; - private static OzoneConfiguration conf; - private static StorageContainerManager scm; - private static ContainerWithPipeline ratisContainer1; - private static ContainerWithPipeline ratisContainer2; - private static ContainerManager containerManager; - private static PipelineManager pipelineManager; + private MiniOzoneCluster cluster; + private OzoneConfiguration conf; + private StorageContainerManager scm; + private ContainerWithPipeline ratisContainer; + private ContainerManager containerManager; + private PipelineManager pipelineManager; + private long pipelineDestroyTimeoutInMillis; /** * Create a MiniDFSCluster for testing. * * @throws IOException */ - @BeforeClass - public static void init() throws Exception { + @Before + public void init() throws Exception { conf = new OzoneConfiguration(); - cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(6).build(); + cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build(); + conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 1000, + TimeUnit.MILLISECONDS); + pipelineDestroyTimeoutInMillis = 5000; + conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, + pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS); cluster.waitForClusterToBeReady(); scm = cluster.getStorageContainerManager(); containerManager = scm.getContainerManager(); pipelineManager = scm.getPipelineManager(); - ContainerInfo containerInfo1 = containerManager + ContainerInfo containerInfo = containerManager .allocateContainer(RATIS, THREE, "testOwner"); - ratisContainer1 = new ContainerWithPipeline(containerInfo1, - pipelineManager.getPipeline(containerInfo1.getPipelineID())); - ContainerInfo containerInfo2 = containerManager - .allocateContainer(RATIS, THREE, "testOwner"); - ratisContainer2 = new ContainerWithPipeline(containerInfo2, - pipelineManager.getPipeline(containerInfo2.getPipelineID())); + ratisContainer = new ContainerWithPipeline(containerInfo, + pipelineManager.getPipeline(containerInfo.getPipelineID())); pipelineManager = scm.getPipelineManager(); // At this stage, there should be 2 pipeline one with 1 open container each. // Try closing the both the pipelines, one with a closed container and @@ -81,8 +92,8 @@ public class TestPipelineClose { /** * Shutdown MiniDFSCluster. */ - @AfterClass - public static void shutdown() { + @After + public void shutdown() { if (cluster != null) { cluster.shutdown(); } @@ -91,9 +102,9 @@ public class TestPipelineClose { @Test public void testPipelineCloseWithClosedContainer() throws IOException { Set<ContainerID> set = pipelineManager - .getContainersInPipeline(ratisContainer1.getPipeline().getId()); + .getContainersInPipeline(ratisContainer.getPipeline().getId()); - ContainerID cId = ratisContainer1.getContainerInfo().containerID(); + ContainerID cId = ratisContainer.getContainerInfo().containerID(); Assert.assertEquals(1, set.size()); set.forEach(containerID -> Assert.assertEquals(containerID, cId)); @@ -105,16 +116,16 @@ public class TestPipelineClose { .updateContainerState(cId, HddsProtos.LifeCycleEvent.CLOSE); Set<ContainerID> setClosed = pipelineManager - .getContainersInPipeline(ratisContainer1.getPipeline().getId()); + .getContainersInPipeline(ratisContainer.getPipeline().getId()); Assert.assertEquals(0, setClosed.size()); - pipelineManager.finalizePipeline(ratisContainer1.getPipeline().getId()); + pipelineManager.finalizePipeline(ratisContainer.getPipeline().getId()); Pipeline pipeline1 = pipelineManager - .getPipeline(ratisContainer1.getPipeline().getId()); + .getPipeline(ratisContainer.getPipeline().getId()); Assert.assertEquals(Pipeline.PipelineState.CLOSED, pipeline1.getPipelineState()); pipelineManager.removePipeline(pipeline1.getId()); - for (DatanodeDetails dn : ratisContainer1.getPipeline().getNodes()) { + for (DatanodeDetails dn : ratisContainer.getPipeline().getNodes()) { // Assert that the pipeline has been removed from Node2PipelineMap as well Assert.assertEquals(scm.getScmNodeManager().getPipelines( dn).size(), 0); @@ -125,17 +136,80 @@ public class TestPipelineClose { public void testPipelineCloseWithOpenContainer() throws IOException, TimeoutException, InterruptedException { Set<ContainerID> setOpen = pipelineManager.getContainersInPipeline( - ratisContainer2.getPipeline().getId()); + ratisContainer.getPipeline().getId()); Assert.assertEquals(1, setOpen.size()); - ContainerID cId2 = ratisContainer2.getContainerInfo().containerID(); - pipelineManager.finalizePipeline(ratisContainer2.getPipeline().getId()); + ContainerID cId2 = ratisContainer.getContainerInfo().containerID(); + pipelineManager.finalizePipeline(ratisContainer.getPipeline().getId()); Assert.assertEquals(Pipeline.PipelineState.CLOSED, pipelineManager.getPipeline( - ratisContainer2.getPipeline().getId()).getPipelineState()); + ratisContainer.getPipeline().getId()).getPipelineState()); Pipeline pipeline2 = pipelineManager - .getPipeline(ratisContainer2.getPipeline().getId()); + .getPipeline(ratisContainer.getPipeline().getId()); Assert.assertEquals(Pipeline.PipelineState.CLOSED, pipeline2.getPipelineState()); } + + @Test + public void testPipelineCloseWithPipelineAction() throws Exception { + List<DatanodeDetails> dns = ratisContainer.getPipeline().getNodes(); + PipelineActionsFromDatanode + pipelineActionsFromDatanode = TestUtils + .getPipelineActionFromDatanode(dns.get(0), + ratisContainer.getPipeline().getId()); + // send closing action for pipeline + PipelineActionHandler pipelineActionHandler = + new PipelineActionHandler(pipelineManager, conf); + pipelineActionHandler + .onMessage(pipelineActionsFromDatanode, new EventQueue()); + Thread.sleep((int) (pipelineDestroyTimeoutInMillis * 1.2)); + OzoneContainer ozoneContainer = + cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() + .getContainer(); + List<PipelineReport> pipelineReports = + ozoneContainer.getPipelineReport().getPipelineReportList(); + for (PipelineReport pipelineReport : pipelineReports) { + // ensure the pipeline is not reported by any dn + Assert.assertNotEquals( + PipelineID.getFromProtobuf(pipelineReport.getPipelineID()), + ratisContainer.getPipeline().getId()); + } + + try { + pipelineManager.getPipeline(ratisContainer.getPipeline().getId()); + Assert.fail("Pipeline should not exist in SCM"); + } catch (PipelineNotFoundException e) { + } + } + + @Test + public void testPipelineCloseWithPipelineReport() throws IOException { + Pipeline pipeline = ratisContainer.getPipeline(); + pipelineManager.finalizePipeline(pipeline.getId()); + // remove pipeline from SCM + pipelineManager.removePipeline(pipeline.getId()); + + for (DatanodeDetails dn : pipeline.getNodes()) { + PipelineReportFromDatanode pipelineReport = + TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId()); + PipelineReportHandler pipelineReportHandler = + new PipelineReportHandler(pipelineManager, conf); + // on receiving pipeline report for the pipeline, pipeline report handler + // should destroy the pipeline for the dn + pipelineReportHandler.onMessage(pipelineReport, new EventQueue()); + } + + OzoneContainer ozoneContainer = + cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() + .getContainer(); + List<PipelineReport> pipelineReports = + ozoneContainer.getPipelineReport().getPipelineReportList(); + for (PipelineReport pipelineReport : pipelineReports) { + // pipeline should not be reported by any dn + Assert.assertNotEquals( + PipelineID.getFromProtobuf(pipelineReport.getPipelineID()), + ratisContainer.getPipeline().getId()); + } + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfb915f3/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java index 0f5692e..d404b84 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java @@ -323,15 +323,6 @@ public class TestPipelineStateManager { // close the pipeline stateManager.finalizePipeline(pipeline.getId()); - - try { - stateManager.removePipeline(pipeline.getId()); - Assert.fail("Pipeline should not have been removed"); - } catch (IOException e) { - // can not remove a pipeline which already has containers - Assert.assertTrue(e.getMessage().contains("not empty")); - } - // remove containers and then remove the pipeline removePipeline(pipeline); } @@ -423,11 +414,6 @@ public class TestPipelineStateManager { private void removePipeline(Pipeline pipeline) throws IOException { stateManager.finalizePipeline(pipeline.getId()); - Set<ContainerID> containerIDs = - stateManager.getContainers(pipeline.getId()); - for (ContainerID containerID : containerIDs) { - stateManager.removeContainerFromPipeline(pipeline.getId(), containerID); - } stateManager.removePipeline(pipeline.getId()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfb915f3/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java index 4487766..ad53015 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java @@ -148,7 +148,7 @@ public class TestSCMPipelineManager { new PipelineReportHandler(pipelineManager, conf); for (DatanodeDetails dn: pipeline.getNodes()) { PipelineReportFromDatanode pipelineReportFromDatanode = - TestUtils.getRandomPipelineReportFromDatanode(dn, pipeline.getId()); + TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId()); // pipeline is not healthy until all dns report Assert.assertFalse( pipelineManager.getPipeline(pipeline.getId()).isHealthy()); @@ -168,7 +168,7 @@ public class TestSCMPipelineManager { for (DatanodeDetails dn: pipeline.getNodes()) { PipelineReportFromDatanode pipelineReportFromDatanode = - TestUtils.getRandomPipelineReportFromDatanode(dn, pipeline.getId()); + TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId()); // pipeline report for a closed pipeline should destroy the pipeline // and remove it from the pipeline manager pipelineReportHandler http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfb915f3/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java new file mode 100644 index 0000000..280cf90 --- /dev/null +++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.freon; + +import org.apache.hadoop.hdds.client.ReplicationFactor; +import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Tests Freon with Pipeline destroy. + */ +public class TestFreonWithPipelineDestroy { + + private static MiniOzoneCluster cluster; + private static OzoneConfiguration conf; + + /** + * Create a MiniDFSCluster for testing. + * <p> + * Ozone is made active by setting OZONE_ENABLED = true + * + */ + @BeforeClass + public static void init() throws Exception { + conf = new OzoneConfiguration(); + cluster = MiniOzoneCluster.newBuilder(conf) + .setHbProcessorInterval(1000) + .setHbInterval(1000) + .setNumDatanodes(3) + .build(); + cluster.waitForClusterToBeReady(); + } + + /** + * Shutdown MiniDFSCluster. + */ + @AfterClass + public static void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testRestart() throws Exception { + startFreon(); + destroyPipeline(); + startFreon(); + } + + private void startFreon() throws Exception { + RandomKeyGenerator randomKeyGenerator = + new RandomKeyGenerator((OzoneConfiguration) cluster.getConf()); + randomKeyGenerator.setNumOfVolumes(1); + randomKeyGenerator.setNumOfBuckets(1); + randomKeyGenerator.setNumOfKeys(1); + randomKeyGenerator.setType(ReplicationType.RATIS); + randomKeyGenerator.setFactor(ReplicationFactor.THREE); + randomKeyGenerator.setKeySize(20971520); + randomKeyGenerator.setValidateWrites(true); + randomKeyGenerator.call(); + Assert.assertEquals(1, randomKeyGenerator.getNumberOfVolumesCreated()); + Assert.assertEquals(1, randomKeyGenerator.getNumberOfBucketsCreated()); + Assert.assertEquals(1, randomKeyGenerator.getNumberOfKeysAdded()); + Assert.assertEquals(0, + randomKeyGenerator.getUnsuccessfulValidationCount()); + } + + private void destroyPipeline() throws Exception { + XceiverServerSpi server = + cluster.getHddsDatanodes().get(0).getDatanodeStateMachine(). + getContainer().getWriteChannel(); + StorageContainerDatanodeProtocolProtos.PipelineReport report = + server.getPipelineReport().get(0); + PipelineID id = PipelineID.getFromProtobuf(report.getPipelineID()); + PipelineManager pipelineManager = + cluster.getStorageContainerManager().getPipelineManager(); + Pipeline pipeline = pipelineManager.getPipeline(id); + RatisPipelineUtils + .finalizeAndDestroyPipeline(pipelineManager, pipeline, conf, false); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
