This is an automated email from the ASF dual-hosted git repository. elek pushed a commit to branch HDDS-2034 in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
commit f7b9ad365cd811b785db5a738241f139832b1fc2 Author: Sammi Chen <sammic...@apache.org> AuthorDate: Mon Sep 23 20:24:19 2019 +0800 HDDS-2034. sync RATIS pipeline creation and destroy through heartbeat commands. --- .../org/apache/hadoop/hdds/HddsConfigKeys.java | 12 +- .../apache/hadoop/hdds/scm/pipeline/Pipeline.java | 7 + .../common/src/main/resources/ozone-default.xml | 30 ++- hadoop-hdds/container-service/pom.xml | 5 + .../common/statemachine/DatanodeStateMachine.java | 14 ++ .../common/statemachine/StateContext.java | 10 + .../CloseContainerCommandHandler.java | 11 +- .../ClosePipelineCommandHandler.java | 166 +++++++++++++++ .../commandhandler/CommandHandler.java | 2 +- .../CreatePipelineCommandHandler.java | 228 +++++++++++++++++++++ .../states/endpoint/HeartbeatEndpointTask.java | 22 ++ .../protocol/commands/ClosePipelineCommand.java | 73 +++++++ .../protocol/commands/CreatePipelineCommand.java | 100 +++++++++ .../commands/CreatePipelineCommandStatus.java | 97 +++++++++ .../proto/StorageContainerDatanodeProtocol.proto | 31 +++ .../hadoop/hdds/scm/block/BlockManagerImpl.java | 29 +++ .../scm/command/CommandStatusReportHandler.java | 24 ++- .../apache/hadoop/hdds/scm/events/SCMEvents.java | 22 +- .../hadoop/hdds/scm/pipeline/PipelineFactory.java | 13 +- .../hadoop/hdds/scm/pipeline/PipelineManager.java | 11 +- .../hadoop/hdds/scm/pipeline/PipelineProvider.java | 2 + .../hdds/scm/pipeline/PipelineReportHandler.java | 32 +-- .../hdds/scm/pipeline/PipelineStateManager.java | 2 +- .../hdds/scm/pipeline/RatisPipelineProvider.java | 124 ++++------- .../hdds/scm/pipeline/RatisPipelineUtils.java | 101 --------- .../hdds/scm/pipeline/SCMPipelineManager.java | 119 +++++++++-- .../hdds/scm/pipeline/SCMPipelineMetrics.java | 11 + .../hdds/scm/pipeline/SimplePipelineProvider.java | 5 + .../scm/safemode/HealthyPipelineSafeModeRule.java | 97 ++++----- .../safemode/OneReplicaPipelineSafeModeRule.java | 64 ++---- .../hdds/scm/safemode/SCMSafeModeManager.java | 13 +- .../hadoop/hdds/scm/safemode/SafeModeHandler.java | 5 +- .../scm/server/SCMDatanodeHeartbeatDispatcher.java | 1 + .../hdds/scm/server/SCMDatanodeProtocolServer.java | 20 ++ .../hdds/scm/server/StorageContainerManager.java | 4 +- .../java/org/apache/hadoop/hdds/scm/TestUtils.java | 34 +++ .../container/TestCloseContainerEventHandler.java | 3 + .../hadoop/hdds/scm/node/TestDeadNodeHandler.java | 11 +- .../hadoop/hdds/scm/node/TestSCMNodeManager.java | 9 +- .../scm/pipeline/MockRatisPipelineProvider.java | 3 +- .../safemode/TestHealthyPipelineSafeModeRule.java | 39 +--- .../TestOneReplicaPipelineSafeModeRule.java | 33 +-- .../hdds/scm/safemode/TestSCMSafeModeManager.java | 36 ++-- .../TestContainerStateManagerIntegration.java | 4 +- .../hdds/scm/pipeline/TestPipelineClose.java | 2 +- .../scm/pipeline/TestRatisPipelineProvider.java | 15 +- .../hdds/scm/pipeline/TestSCMPipelineManager.java | 46 ++--- .../apache/hadoop/ozone/MiniOzoneClusterImpl.java | 16 +- 48 files changed, 1265 insertions(+), 493 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java index 99972ae..5e161b3 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java @@ -81,7 +81,12 @@ public final class HddsConfigKeys { public static final String HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK = "hdds.scm.safemode.pipeline-availability.check"; public static final boolean - HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT = false; + HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT = true; + + public static final String HDDS_SCM_SAFEMODE_PIPELINE_CREATION = + "hdds.scm.safemode.pipeline.creation"; + public static final boolean + HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT = true; // % of containers which should have at least one reported replica // before SCM comes out of safe mode. @@ -89,13 +94,16 @@ public final class HddsConfigKeys { "hdds.scm.safemode.threshold.pct"; public static final double HDDS_SCM_SAFEMODE_THRESHOLD_PCT_DEFAULT = 0.99; - // percentage of healthy pipelines, where all 3 datanodes are reported in the // pipeline. public static final String HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT = "hdds.scm.safemode.healthy.pipelie.pct"; public static final double HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT_DEFAULT = 0.10; + // number of healthy RATIS pipeline(ONE or THREE factor) + public static final String HDDS_SCM_SAFEMODE_MIN_PIPELINE = + "hdds.scm.safemode.min.pipeline"; + public static final int HDDS_SCM_SAFEMODE_MIN_PIPELINE_DEFAULT = 1; public static final String HDDS_SCM_SAFEMODE_ONE_NODE_REPORTED_PIPELINE_PCT = "hdds.scm.safemode.atleast.one.node.reported.pipeline.pct"; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java index 1627569..9d4b728 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java @@ -51,6 +51,7 @@ public final class Pipeline { private Map<DatanodeDetails, Long> nodeStatus; // nodes with ordered distance to client private ThreadLocal<List<DatanodeDetails>> nodesInOrder = new ThreadLocal<>(); + private final long creationTime; /** * The immutable properties of pipeline object is used in @@ -65,6 +66,7 @@ public final class Pipeline { this.factor = factor; this.state = state; this.nodeStatus = nodeStatus; + this.creationTime = System.currentTimeMillis(); } /** @@ -135,6 +137,11 @@ public final class Pipeline { return state == PipelineState.OPEN; } + public boolean isAllocationTimeout() { + //TODO: define a system property to control the timeout value + return false; + } + public void setNodesInOrder(List<DatanodeDetails> nodes) { nodesInOrder.set(nodes); } diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index a038047..1d592fa 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -311,15 +311,6 @@ defined with postfix (ns,ms,s,m,h,d)</description> </property> <property> - <name>hdds.command.status.report.interval</name> - <value>60000ms</value> - <tag>OZONE, CONTAINER, MANAGEMENT</tag> - <description>Time interval of the datanode to send status of command - execution. Each datanode periodically the execution status of commands - received from SCM to SCM. Unit could be defined with postfix - (ns,ms,s,m,h,d)</description> - </property> - <property> <name>hdds.pipeline.report.interval</name> <value>60000ms</value> <tag>OZONE, PIPELINE, MANAGEMENT</tag> @@ -1315,7 +1306,7 @@ <property> <name>hdds.scm.safemode.pipeline-availability.check</name> - <value>false</value> + <value>true</value> <tag>HDDS,SCM,OPERATION</tag> <description> Boolean value to enable pipeline availability check during SCM safe mode. @@ -1401,6 +1392,25 @@ </property> <property> + <name>hdds.scm.safemode.pipeline.creation</name> + <value>true</value> + <tag>HDDS,SCM,OPERATION</tag> + <description> + Boolean value to enable background pipeline creation in SCM safe mode. + </description> + </property> + + <property> + <name>hdds.scm.safemode.min.pipeline</name> + <value>1</value> + <tag>HDDS,SCM,OPERATION</tag> + <description> + Minimum RATIS pipeline number to exit SCM safe mode. Considered only when + "hdds.scm.safemode.pipeline.creation" is True. + </description> + </property> + + <property> <name>hdds.lock.max.concurrency</name> <value>100</value> <tag>HDDS</tag> diff --git a/hadoop-hdds/container-service/pom.xml b/hadoop-hdds/container-service/pom.xml index 2f89fa2..bb85f3c 100644 --- a/hadoop-hdds/container-service/pom.xml +++ b/hadoop-hdds/container-service/pom.xml @@ -38,6 +38,11 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> <artifactId>hadoop-hdds-server-framework</artifactId> </dependency> <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdds-client</artifactId> + <version>${hdds.version}</version> + </dependency> + <dependency> <groupId>io.dropwizard.metrics</groupId> <artifactId>metrics-core</artifactId> </dependency> diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index c9eb702..926f19c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -39,8 +39,12 @@ import org.apache.hadoop.ozone.container.common.report.ReportManager; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler .CloseContainerCommandHandler; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler + .ClosePipelineCommandHandler; +import org.apache.hadoop.ozone.container.common.statemachine.commandhandler .CommandDispatcher; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler + .CreatePipelineCommandHandler; +import org.apache.hadoop.ozone.container.common.statemachine.commandhandler .DeleteBlocksCommandHandler; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler .DeleteContainerCommandHandler; @@ -126,6 +130,8 @@ public class DatanodeStateMachine implements Closeable { conf)) .addHandler(new ReplicateContainerCommandHandler(conf, supervisor)) .addHandler(new DeleteContainerCommandHandler()) + .addHandler(new ClosePipelineCommandHandler()) + .addHandler(new CreatePipelineCommandHandler()) .setConnectionManager(connectionManager) .setContainer(container) .setContext(context) @@ -486,4 +492,12 @@ public class DatanodeStateMachine implements Closeable { public ReplicationSupervisor getSupervisor() { return supervisor; } + + public Configuration getConf() { + return conf; + } + + public CertificateClient getCertificateClient() { + return dnCertClient; + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index 2c01f3a..17e5502 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -37,6 +37,8 @@ import org.apache.hadoop.ozone.container.common.states.datanode import org.apache.hadoop.ozone.protocol.commands.CommandStatus; import org.apache.hadoop.ozone.protocol.commands .DeleteBlockCommandStatus.DeleteBlockCommandStatusBuilder; +import org.apache.hadoop.ozone.protocol.commands + .CreatePipelineCommandStatus.CreatePipelineCommandStatusBuilder; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import static java.lang.Math.min; @@ -464,6 +466,14 @@ public class StateContext { .setType(cmd.getType()) .build()); } + if (cmd.getType() == SCMCommandProto.Type.createPipelineCommand) { + addCmdStatus(cmd.getId(), + CreatePipelineCommandStatusBuilder.newBuilder() + .setCmdId(cmd.getId()) + .setStatus(Status.PENDING) + .setType(cmd.getType()) + .build()); + } } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java index 881fea0..f7b80cd 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java @@ -39,6 +39,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; /** * Handler for close container command received from SCM. @@ -48,7 +49,7 @@ public class CloseContainerCommandHandler implements CommandHandler { private static final Logger LOG = LoggerFactory.getLogger(CloseContainerCommandHandler.class); - private int invocationCount; + private AtomicLong invocationCount = new AtomicLong(0); private long totalTime; /** @@ -69,7 +70,7 @@ public class CloseContainerCommandHandler implements CommandHandler { public void handle(SCMCommand command, OzoneContainer ozoneContainer, StateContext context, SCMConnectionManager connectionManager) { LOG.debug("Processing Close Container command."); - invocationCount++; + invocationCount.incrementAndGet(); final long startTime = Time.monotonicNow(); final DatanodeDetails datanodeDetails = context.getParent() .getDatanodeDetails(); @@ -159,7 +160,7 @@ public class CloseContainerCommandHandler implements CommandHandler { */ @Override public int getInvocationCount() { - return invocationCount; + return (int)invocationCount.get(); } /** @@ -169,8 +170,8 @@ public class CloseContainerCommandHandler implements CommandHandler { */ @Override public long getAverageRunTime() { - if (invocationCount > 0) { - return totalTime / invocationCount; + if (invocationCount.get() > 0) { + return totalTime / invocationCount.get(); } return 0; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java new file mode 100644 index 0000000..bd48839 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto. + StorageContainerDatanodeProtocolProtos.ClosePipelineCommandProto; +import org.apache.hadoop.hdds.protocol.proto. + StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.hdds.ratis.RatisHelper; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.client.HddsClientUtils; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.security.x509.SecurityConfig; +import org.apache.hadoop.hdds.security.x509.certificate.client + .CertificateClient; +import org.apache.hadoop.ozone.container.common.statemachine + .SCMConnectionManager; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.util.Time; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.grpc.GrpcTlsConfig; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.retry.RetryPolicy; +import org.apache.ratis.rpc.SupportedRpcType; +import org.apache.ratis.util.TimeDuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Handler for close pipeline command received from SCM. + */ +public class ClosePipelineCommandHandler implements CommandHandler { + + private static final Logger LOG = + LoggerFactory.getLogger(ClosePipelineCommandHandler.class); + + private AtomicLong invocationCount = new AtomicLong(0); + private long totalTime; + + /** + * Constructs a closePipelineCommand handler. + */ + public ClosePipelineCommandHandler() { + } + + /** + * Handles a given SCM command. + * + * @param command - SCM Command + * @param ozoneContainer - Ozone Container. + * @param context - Current Context. + * @param connectionManager - The SCMs that we are talking to. + */ + @Override + public void handle(SCMCommand command, OzoneContainer ozoneContainer, + StateContext context, SCMConnectionManager connectionManager) { + invocationCount.incrementAndGet(); + final long startTime = Time.monotonicNow(); + final DatanodeDetails datanode = context.getParent() + .getDatanodeDetails(); + final ClosePipelineCommandProto closeCommand = + ((ClosePipelineCommand)command).getProto(); + final PipelineID pipelineID = PipelineID.getFromProtobuf( + closeCommand.getPipelineID()); + + try { + destroyPipeline(datanode, pipelineID, context); + LOG.info("Close Pipeline #{} command on datanode #{}.", pipelineID, + datanode.getUuidString()); + } catch (IOException e) { + LOG.error("Can't close pipeline #{}", pipelineID, e); + } finally { + long endTime = Time.monotonicNow(); + totalTime += endTime - startTime; + } + + } + + /** + * Returns the command type that this command handler handles. + * + * @return Type + */ + @Override + public SCMCommandProto.Type getCommandType() { + return SCMCommandProto.Type.closePipelineCommand; + } + + /** + * Returns number of times this handler has been invoked. + * + * @return int + */ + @Override + public int getInvocationCount() { + return (int)invocationCount.get(); + } + + /** + * Returns the average time this function takes to run. + * + * @return long + */ + @Override + public long getAverageRunTime() { + if (invocationCount.get() > 0) { + return totalTime / invocationCount.get(); + } + return 0; + } + + /** + * Destroy pipeline on this datanode. + * + * @param dn - Datanode on which pipeline needs to be destroyed + * @param pipelineID - ID of pipeline to be destroyed + * @param context - Ozone datanode context + * @throws IOException + */ + void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID, + StateContext context) throws IOException { + final Configuration ozoneConf = context.getParent().getConf(); + final String rpcType = ozoneConf + .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, + ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); + final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf); + final RaftPeer p = RatisHelper.toRaftPeer(dn); + final int maxOutstandingRequests = + HddsClientUtils.getMaxOutstandingRequests(ozoneConf); + final CertificateClient dnCertClient = + context.getParent().getCertificateClient(); + final GrpcTlsConfig tlsConfig = RatisHelper.createTlsServerConfigForDN( + new SecurityConfig(ozoneConf), dnCertClient); + final TimeDuration requestTimeout = + RatisHelper.getClientRequestTimeout(ozoneConf); + try(RaftClient client = RatisHelper + .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p, + retryPolicy, maxOutstandingRequests, tlsConfig, requestTimeout)) { + client.groupRemove(RaftGroupId.valueOf(pipelineID.getId()), + true, p.getId()); + } + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java index 1ea0ea8..dca02f6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java @@ -68,7 +68,7 @@ public interface CommandHandler { default void updateCommandStatus(StateContext context, SCMCommand command, Consumer<CommandStatus> cmdStatusUpdater, Logger log) { if (!context.updateCommandStatus(command.getId(), cmdStatusUpdater)) { - log.debug("{} with Id:{} not found.", command.getType(), + log.warn("{} with Id:{} not found.", command.getType(), command.getId()); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java new file mode 100644 index 0000000..e6d20e7 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto. + StorageContainerDatanodeProtocolProtos.CreatePipelineCommandProto; +import org.apache.hadoop.hdds.protocol.proto. + StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.hdds.protocol.proto. + StorageContainerDatanodeProtocolProtos.CreatePipelineACKProto; +import org.apache.hadoop.hdds.ratis.RatisHelper; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.client.HddsClientUtils; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.security.x509.SecurityConfig; +import org.apache.hadoop.hdds.security.x509.certificate.client + .CertificateClient; +import org.apache.hadoop.io.MultipleIOException; +import org.apache.hadoop.ozone.container.common.statemachine + .SCMConnectionManager; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.CommandStatus; +import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand; +import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommandStatus; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.util.Time; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.grpc.GrpcTlsConfig; +import org.apache.ratis.protocol.NotLeaderException; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.retry.RetryPolicy; +import org.apache.ratis.rpc.SupportedRpcType; +import org.apache.ratis.util.TimeDuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * Handler for create pipeline command received from SCM. + */ +public class CreatePipelineCommandHandler implements CommandHandler { + + private static final Logger LOG = + LoggerFactory.getLogger(CreatePipelineCommandHandler.class); + + private AtomicLong invocationCount = new AtomicLong(0); + private long totalTime; + + /** + * Constructs a createPipelineCommand handler. + */ + public CreatePipelineCommandHandler() { + } + + /** + * Handles a given SCM command. + * + * @param command - SCM Command + * @param ozoneContainer - Ozone Container. + * @param context - Current Context. + * @param connectionManager - The SCMs that we are talking to. + */ + @Override + public void handle(SCMCommand command, OzoneContainer ozoneContainer, + StateContext context, SCMConnectionManager connectionManager) { + invocationCount.incrementAndGet(); + final long startTime = Time.monotonicNow(); + final DatanodeDetails dn = context.getParent() + .getDatanodeDetails(); + final CreatePipelineCommandProto createCommand = + ((CreatePipelineCommand)command).getProto(); + final PipelineID pipelineID = PipelineID.getFromProtobuf( + createCommand.getPipelineID()); + Collection<DatanodeDetails> peers = + createCommand.getDatanodeList().stream() + .map(DatanodeDetails::getFromProtoBuf) + .collect(Collectors.toList()); + + final CreatePipelineACKProto createPipelineACK = + CreatePipelineACKProto.newBuilder() + .setPipelineID(createCommand.getPipelineID()) + .setDatanodeUUID(dn.getUuidString()).build(); + boolean success = false; + try { + createPipeline(dn, pipelineID, peers, context); + success = true; + LOG.info("Create Pipeline {} {} #{} command succeed on datanode {}.", + createCommand.getType(), createCommand.getFactor(), pipelineID, + dn.getUuidString()); + } catch (NotLeaderException e) { + LOG.debug("Follower cannot create pipeline #{}.", pipelineID); + } catch (IOException e) { + LOG.error("Can't create pipeline {} {} #{}", createCommand.getType(), + createCommand.getFactor(), pipelineID, e); + } finally { + final boolean cmdExecuted = success; + Consumer<CommandStatus> statusUpdater = (cmdStatus) -> { + cmdStatus.setStatus(cmdExecuted); + ((CreatePipelineCommandStatus)cmdStatus) + .setCreatePipelineAck(createPipelineACK); + }; + updateCommandStatus(context, command, statusUpdater, LOG); + long endTime = Time.monotonicNow(); + totalTime += endTime - startTime; + } + } + + /** + * Returns the command type that this command handler handles. + * + * @return Type + */ + @Override + public SCMCommandProto.Type getCommandType() { + return SCMCommandProto.Type.createPipelineCommand; + } + + /** + * Returns number of times this handler has been invoked. + * + * @return int + */ + @Override + public int getInvocationCount() { + return (int)invocationCount.get(); + } + + /** + * Returns the average time this function takes to run. + * + * @return long + */ + @Override + public long getAverageRunTime() { + if (invocationCount.get() > 0) { + return totalTime / invocationCount.get(); + } + return 0; + } + + /** + * Sends ratis command to create pipeline on this datanode. + * + * @param dn - this datanode + * @param pipelineId - pipeline ID + * @param peers - datanodes of the pipeline + * @param context - Ozone datanode context + * @throws IOException + */ + private void createPipeline(DatanodeDetails dn, PipelineID pipelineId, + Collection<DatanodeDetails> peers, StateContext context) + throws IOException { + final Configuration ozoneConf = context.getParent().getConf(); + final RaftGroup group = RatisHelper.newRaftGroup( + RaftGroupId.valueOf(pipelineId.getId()), peers); + LOG.debug("creating pipeline:#{} with {}", pipelineId, group); + + final String rpcType = ozoneConf + .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, + ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); + final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf); + final List< IOException > exceptions = + Collections.synchronizedList(new ArrayList<>()); + final int maxOutstandingRequests = + HddsClientUtils.getMaxOutstandingRequests(ozoneConf); + final CertificateClient dnCertClient = + context.getParent().getCertificateClient(); + final GrpcTlsConfig tlsConfig = RatisHelper.createTlsServerConfigForDN( + new SecurityConfig(ozoneConf), dnCertClient); + final TimeDuration requestTimeout = + RatisHelper.getClientRequestTimeout(ozoneConf); + try { + final RaftPeer peer = RatisHelper.toRaftPeer(dn); + try (RaftClient client = RatisHelper + .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), peer, + retryPolicy, maxOutstandingRequests, tlsConfig, + requestTimeout)) { + + RaftClientReply reply = client.groupAdd(group, peer.getId()); + if (reply == null || !reply.isSuccess()) { + String msg = "Pipeline initialization failed for pipeline:" + + pipelineId.getId() + " node:" + peer.getId(); + throw new IOException(msg); + } + } catch (IOException ioe) { + String errMsg = + "Failed invoke Ratis rpc for " + dn.getUuid(); + exceptions.add(new IOException(errMsg, ioe)); + } + } catch (RejectedExecutionException ex) { + throw new IOException(ex.getClass().getName() + " exception occurred " + + "during createPipeline", ex); + } + + if (!exceptions.isEmpty()) { + throw MultipleIOException.createIOException(exceptions); + } + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java index c50f457..a55d0d6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -46,6 +46,8 @@ import org.apache.hadoop.ozone.container.common.statemachine .EndpointStateMachine.EndPointStates; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; +import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand; +import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand; import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand; import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; @@ -309,6 +311,26 @@ public class HeartbeatEndpointTask } this.context.addCommand(deleteContainerCommand); break; + case createPipelineCommand: + CreatePipelineCommand createPipelineCommand = + CreatePipelineCommand.getFromProtobuf( + commandResponseProto.getCreatePipelineCommandProto()); + if (LOG.isDebugEnabled()) { + LOG.debug("Received SCM create pipeline request {}", + createPipelineCommand.getPipelineID()); + } + this.context.addCommand(createPipelineCommand); + break; + case closePipelineCommand: + ClosePipelineCommand closePipelineCommand = + ClosePipelineCommand.getFromProtobuf( + commandResponseProto.getClosePipelineCommandProto()); + if (LOG.isDebugEnabled()) { + LOG.debug("Received SCM close pipeline request {}", + closePipelineCommand.getPipelineID()); + } + this.context.addCommand(closePipelineCommand); + break; default: throw new IllegalArgumentException("Unknown response : " + commandResponseProto.getCommandType().name()); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ClosePipelineCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ClosePipelineCommand.java new file mode 100644 index 0000000..1f75bc3 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ClosePipelineCommand.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.protocol.commands; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ClosePipelineCommandProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; + +/** + * Asks datanode to close a pipeline. + */ +public class ClosePipelineCommand + extends SCMCommand<ClosePipelineCommandProto> { + + private final PipelineID pipelineID; + + public ClosePipelineCommand(final PipelineID pipelineID) { + super(); + this.pipelineID = pipelineID; + } + + public ClosePipelineCommand(long cmdId, final PipelineID pipelineID) { + super(cmdId); + this.pipelineID = pipelineID; + } + + /** + * Returns the type of this command. + * + * @return Type + */ + @Override + public SCMCommandProto.Type getType() { + return SCMCommandProto.Type.closePipelineCommand; + } + + @Override + public ClosePipelineCommandProto getProto() { + ClosePipelineCommandProto.Builder builder = + ClosePipelineCommandProto.newBuilder(); + builder.setCmdId(getId()); + builder.setPipelineID(pipelineID.getProtobuf()); + return builder.build(); + } + + public static ClosePipelineCommand getFromProtobuf( + ClosePipelineCommandProto createPipelineProto) { + Preconditions.checkNotNull(createPipelineProto); + return new ClosePipelineCommand(createPipelineProto.getCmdId(), + PipelineID.getFromProtobuf(createPipelineProto.getPipelineID())); + } + + public PipelineID getPipelineID() { + return pipelineID; + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java new file mode 100644 index 0000000..9e22cbc --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.protocol.commands; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto. + StorageContainerDatanodeProtocolProtos.CreatePipelineCommandProto; +import org.apache.hadoop.hdds.protocol.proto. + StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * Asks datanode to create a pipeline. + */ +public class CreatePipelineCommand + extends SCMCommand<CreatePipelineCommandProto> { + + private final PipelineID pipelineID; + private final ReplicationFactor factor; + private final ReplicationType type; + private final List<DatanodeDetails> nodelist; + + public CreatePipelineCommand(final PipelineID pipelineID, + final ReplicationType type, final ReplicationFactor factor, + final List<DatanodeDetails> datanodeList) { + super(); + this.pipelineID = pipelineID; + this.factor = factor; + this.type = type; + this.nodelist = datanodeList; + } + + public CreatePipelineCommand(long cmdId, final PipelineID pipelineID, + final ReplicationType type, final ReplicationFactor factor, + final List<DatanodeDetails> datanodeList) { + super(cmdId); + this.pipelineID = pipelineID; + this.factor = factor; + this.type = type; + this.nodelist = datanodeList; + } + + /** + * Returns the type of this command. + * + * @return Type + */ + @Override + public SCMCommandProto.Type getType() { + return SCMCommandProto.Type.createPipelineCommand; + } + + @Override + public CreatePipelineCommandProto getProto() { + return CreatePipelineCommandProto.newBuilder() + .setCmdId(getId()) + .setPipelineID(pipelineID.getProtobuf()) + .setFactor(factor) + .setType(type) + .addAllDatanode(nodelist.stream() + .map(DatanodeDetails::getProtoBufMessage) + .collect(Collectors.toList())) + .build(); + } + + public static CreatePipelineCommand getFromProtobuf( + CreatePipelineCommandProto createPipelineProto) { + Preconditions.checkNotNull(createPipelineProto); + return new CreatePipelineCommand(createPipelineProto.getCmdId(), + PipelineID.getFromProtobuf(createPipelineProto.getPipelineID()), + createPipelineProto.getType(), createPipelineProto.getFactor(), + createPipelineProto.getDatanodeList().stream() + .map(DatanodeDetails::getFromProtoBuf) + .collect(Collectors.toList())); + } + + public PipelineID getPipelineID() { + return pipelineID; + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommandStatus.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommandStatus.java new file mode 100644 index 0000000..b5e7d6c --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommandStatus.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.protocol.commands; + +import org.apache.hadoop.hdds.protocol.proto. + StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto. + StorageContainerDatanodeProtocolProtos.CreatePipelineACKProto; +import org.apache.hadoop.hdds.protocol.proto. + StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; +import org.apache.hadoop.hdds.protocol.proto. + StorageContainerDatanodeProtocolProtos.CommandStatus.Status; +/** + * Command status to report about pipeline creation. + */ +public class CreatePipelineCommandStatus extends CommandStatus { + + private CreatePipelineACKProto createPipelineAck; + + public CreatePipelineCommandStatus(Type type, Long cmdId, Status status, + String msg, CreatePipelineACKProto ack) { + super(type, cmdId, status, msg); + this.createPipelineAck = ack; + } + + public void setCreatePipelineAck(CreatePipelineACKProto ack) { + createPipelineAck = ack; + } + + @Override + public CommandStatus getFromProtoBuf( + StorageContainerDatanodeProtocolProtos.CommandStatus cmdStatusProto) { + return CreatePipelineCommandStatusBuilder.newBuilder() + .setCreatePipelineAck(cmdStatusProto.getCreatePipelineAck()) + .setCmdId(cmdStatusProto.getCmdId()) + .setStatus(cmdStatusProto.getStatus()) + .setType(cmdStatusProto.getType()) + .setMsg(cmdStatusProto.getMsg()) + .build(); + } + + @Override + public StorageContainerDatanodeProtocolProtos.CommandStatus + getProtoBufMessage() { + StorageContainerDatanodeProtocolProtos.CommandStatus.Builder builder = + StorageContainerDatanodeProtocolProtos.CommandStatus.newBuilder() + .setCmdId(this.getCmdId()) + .setStatus(this.getStatus()) + .setType(this.getType()); + if (createPipelineAck != null) { + builder.setCreatePipelineAck(createPipelineAck); + } + if (this.getMsg() != null) { + builder.setMsg(this.getMsg()); + } + return builder.build(); + } + + /** + * Builder for CreatePipelineCommandStatus. + */ + public static final class CreatePipelineCommandStatusBuilder + extends CommandStatusBuilder { + private CreatePipelineACKProto createPipelineAck = null; + + public static CreatePipelineCommandStatusBuilder newBuilder() { + return new CreatePipelineCommandStatusBuilder(); + } + + public CreatePipelineCommandStatusBuilder setCreatePipelineAck( + CreatePipelineACKProto ack) { + this.createPipelineAck = ack; + return this; + } + + @Override + public CommandStatus build() { + return new CreatePipelineCommandStatus(getType(), getCmdId(), getStatus(), + getMsg(), createPipelineAck); + } + } +} diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index 1d09dfa..90124c6 100644 --- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -177,6 +177,7 @@ message CommandStatus { required SCMCommandProto.Type type = 3; optional string msg = 4; optional ContainerBlocksDeletionACKProto blockDeletionAck = 5; + optional CreatePipelineACKProto createPipelineAck = 6; } message ContainerActionsProto { @@ -243,6 +244,8 @@ message SCMCommandProto { closeContainerCommand = 3; deleteContainerCommand = 4; replicateContainerCommand = 5; + createPipelineCommand = 6; + closePipelineCommand = 7; } // TODO: once we start using protoc 3.x, refactor this message using "oneof" required Type commandType = 1; @@ -251,6 +254,8 @@ message SCMCommandProto { optional CloseContainerCommandProto closeContainerCommandProto = 4; optional DeleteContainerCommandProto deleteContainerCommandProto = 5; optional ReplicateContainerCommandProto replicateContainerCommandProto = 6; + optional CreatePipelineCommandProto createPipelineCommandProto = 7; + optional ClosePipelineCommandProto closePipelineCommandProto = 8; } /** @@ -320,6 +325,32 @@ message ReplicateContainerCommandProto { } /** +This command asks the datanode to create a pipeline. +*/ +message CreatePipelineCommandProto { + required PipelineID pipelineID = 1; + required ReplicationType type = 2; + required ReplicationFactor factor = 3; + repeated DatanodeDetailsProto datanode = 4; + required int64 cmdId = 5; +} + +// ACK message datanode sent to SCM, contains the result of +// pipeline creation status. +message CreatePipelineACKProto { + required PipelineID pipelineID = 1; + required string datanodeUUID = 2; +} + +/** +This command asks the datanode to close a pipeline. +*/ +message ClosePipelineCommandProto { + required PipelineID pipelineID = 1; + required int64 cmdId = 2; +} + +/** * Protocol used from a datanode to StorageContainerManager. * * Please see the request and response messages for details of the RPC calls. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java index a0a7222..1390c52 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java @@ -24,8 +24,11 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; import javax.management.ObjectName; + +import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.StorageUnit; +import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.ContainerBlockID; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; @@ -79,6 +82,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { private ObjectName mxBean; private SafeModePrecheck safeModePrecheck; + private final long pipelineCreateWaitTimeout; /** * Constructor. @@ -117,6 +121,22 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { scm.getScmNodeManager(), scm.getEventQueue(), svcInterval, serviceTimeout, conf); safeModePrecheck = new SafeModePrecheck(conf); + + long heartbeatInterval = conf.getTimeDuration( + HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, + HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + + long commandStatusReportInterval = conf.getTimeDuration( + HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL, + HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + + Preconditions.checkState(heartbeatInterval <= commandStatusReportInterval, + "Heartbeat interval is smaller than command status report interval"); + pipelineCreateWaitTimeout = + ((commandStatusReportInterval + heartbeatInterval - 1) + / heartbeatInterval) * heartbeatInterval + 5000L; } /** @@ -188,6 +208,15 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { // TODO: #CLUTIL Remove creation logic when all replication types and // factors are handled by pipeline creator pipeline = pipelineManager.createPipeline(type, factor); + // wait until pipeline is ready + long current = System.currentTimeMillis(); + while (!pipeline.isOpen() && System.currentTimeMillis() < + (current + pipelineCreateWaitTimeout)) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + } } catch (IOException e) { LOG.warn("Pipeline creation failed for type:{} factor:{}. Retrying " + "get pipelines call once.", type, factor, e); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java index d1479f7..aad5573 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hdds.scm.command; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandProto; -import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.CommandStatus; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher @@ -55,13 +53,21 @@ public class CommandStatusReportHandler implements cmdStatusList.forEach(cmdStatus -> { LOGGER.trace("Emitting command status for id:{} type: {}", cmdStatus .getCmdId(), cmdStatus.getType()); - if (cmdStatus.getType() == SCMCommandProto.Type.deleteBlocksCommand) { + switch (cmdStatus.getType()) { + case deleteBlocksCommand: if (cmdStatus.getStatus() == CommandStatus.Status.EXECUTED) { publisher.fireEvent(SCMEvents.DELETE_BLOCK_STATUS, new DeleteBlockStatus(cmdStatus)); } - } else { - LOGGER.debug("CommandStatus of type:{} not handled in " + + break; + case createPipelineCommand: + if (cmdStatus.getStatus() != CommandStatus.Status.PENDING) { + publisher.fireEvent(SCMEvents.CREATE_PIPELINE_STATUS, + new CreatePipelineStatus(cmdStatus)); + } + break; + default: + LOGGER.warn("CommandStatus of type:{} not handled in " + "CommandStatusReportHandler.", cmdStatus.getType()); } }); @@ -101,4 +107,12 @@ public class CommandStatusReportHandler implements } } + /** + * Wrapper event for CreatePipeline Command Status. + */ + public static class CreatePipelineStatus extends CommandStatusEvent { + public CreatePipelineStatus(CommandStatus cmdStatus) { + super(cmdStatus); + } + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java index 43d396e..dd476a7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm.events; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus; import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler; import org.apache.hadoop.hdds.scm.container.ContainerID; @@ -40,6 +41,8 @@ import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher .NodeReportFromDatanode; import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer .NodeRegistrationContainerReport; +import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler + .CreatePipelineStatus; import org.apache.hadoop.hdds.server.events.Event; import org.apache.hadoop.hdds.server.events.TypedEvent; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; @@ -97,15 +100,14 @@ public final class SCMEvents { new TypedEvent<>(PipelineReportFromDatanode.class, "Pipeline_Report"); /** - * PipelineReport processed by pipeline report handler. This event is + * Open pipeline event sent by ScmPipelineManager. This event is * received by HealthyPipelineSafeModeRule. */ - public static final TypedEvent<PipelineReportFromDatanode> - PROCESSED_PIPELINE_REPORT = new TypedEvent<>( - PipelineReportFromDatanode.class, "Processed_Pipeline_Report"); + public static final TypedEvent<Pipeline> + OPEN_PIPELINE = new TypedEvent<>(Pipeline.class, "Open_Pipeline"); /** - * PipelineActions are sent by Datanode. This event is received by + * PipelineActions are sent by Datanode to close a pipeline. It's received by * SCMDatanodeHeartbeatDispatcher and PIPELINE_ACTIONS event is generated. */ public static final TypedEvent<PipelineActionsFromDatanode> @@ -113,7 +115,7 @@ public final class SCMEvents { "Pipeline_Actions"); /** - * A Command status report will be sent by datanodes. This repoort is received + * A Command status report will be sent by datanodes. This report is received * by SCMDatanodeHeartbeatDispatcher and CommandReport event is generated. */ public static final TypedEvent<CommandStatusReportFromDatanode> @@ -197,6 +199,14 @@ public final class SCMEvents { new TypedEvent<>(SafeModeStatus.class); /** + * This event is triggered by CommandStatusReportHandler whenever a + * status for CreatePipeline SCMCommand is received. + */ + public static final TypedEvent<CreatePipelineStatus> + CREATE_PIPELINE_STATUS = + new TypedEvent<>(CreatePipelineStatus.class, "Create_Pipeline_Status"); + + /** * Private Ctor. Never Constructed. */ private SCMEvents() { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java index 77e037a..86ad5ee 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java @@ -24,7 +24,8 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.scm.node.NodeManager; -import org.apache.ratis.grpc.GrpcTlsConfig; +import org.apache.hadoop.hdds.server.events.EventPublisher; + import java.io.IOException; import java.util.HashMap; @@ -39,12 +40,13 @@ public final class PipelineFactory { private Map<ReplicationType, PipelineProvider> providers; PipelineFactory(NodeManager nodeManager, PipelineStateManager stateManager, - Configuration conf, GrpcTlsConfig tlsConfig) { + Configuration conf, EventPublisher eventPublisher) { providers = new HashMap<>(); providers.put(ReplicationType.STAND_ALONE, new SimplePipelineProvider(nodeManager)); providers.put(ReplicationType.RATIS, - new RatisPipelineProvider(nodeManager, stateManager, conf, tlsConfig)); + new RatisPipelineProvider(nodeManager, stateManager, conf, + eventPublisher)); } @VisibleForTesting @@ -63,6 +65,11 @@ public final class PipelineFactory { return providers.get(type).create(factor, nodes); } + public void close(ReplicationType type, Pipeline pipeline) + throws IOException { + providers.get(type).close(pipeline); + } + public void shutdown() { providers.values().forEach(provider -> provider.shutdown()); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java index 9ba5f31..e477860 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java @@ -21,8 +21,9 @@ package org.apache.hadoop.hdds.scm.pipeline; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler; import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.ratis.grpc.GrpcTlsConfig; +import org.apache.hadoop.hdds.server.events.EventHandler; import java.io.Closeable; import java.io.IOException; @@ -33,7 +34,8 @@ import java.util.NavigableSet; /** * Interface which exposes the api for pipeline management. */ -public interface PipelineManager extends Closeable, PipelineManagerMXBean { +public interface PipelineManager extends Closeable, PipelineManagerMXBean, + EventHandler<CommandStatusReportHandler.CreatePipelineStatus> { Pipeline createPipeline(ReplicationType type, ReplicationFactor factor) throws IOException; @@ -51,6 +53,9 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean { ReplicationFactor factor); List<Pipeline> getPipelines(ReplicationType type, + Pipeline.PipelineState state); + + List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor, Pipeline.PipelineState state); List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor, @@ -94,6 +99,4 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean { * @throws IOException in case of any Exception */ void deactivatePipeline(PipelineID pipelineID) throws IOException; - - GrpcTlsConfig getGrpcTlsConfig(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java index a0ce216..c00ff78 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java @@ -33,5 +33,7 @@ public interface PipelineProvider { Pipeline create(ReplicationFactor factor, List<DatanodeDetails> nodes); + void close(Pipeline pipeline) throws IOException; + void shutdown(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java index 2b11da9..6b9a839 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java @@ -26,17 +26,18 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReport; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; -import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; import org.apache.hadoop.hdds.scm.server .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand; +import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Objects; /** * Handles Pipeline Reports from datanode. @@ -52,17 +53,14 @@ public class PipelineReportHandler implements private final boolean pipelineAvailabilityCheck; public PipelineReportHandler(SCMSafeModeManager scmSafeModeManager, - PipelineManager pipelineManager, - Configuration conf) { + PipelineManager pipelineManager, Configuration conf) { Preconditions.checkNotNull(pipelineManager); - Objects.requireNonNull(scmSafeModeManager); this.scmSafeModeManager = scmSafeModeManager; this.pipelineManager = pipelineManager; this.conf = conf; this.pipelineAvailabilityCheck = conf.getBoolean( HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT); - } @Override @@ -77,28 +75,26 @@ public class PipelineReportHandler implements LOGGER.trace("Processing pipeline report for dn: {}", dn); for (PipelineReport report : pipelineReport.getPipelineReportList()) { try { - processPipelineReport(report, dn); + processPipelineReport(report, dn, publisher); } catch (IOException e) { LOGGER.error("Could not process pipeline report={} from dn={} {}", report, dn, e); } } - if (pipelineAvailabilityCheck && scmSafeModeManager.getInSafeMode()) { - publisher.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT, - pipelineReportFromDatanode); - } - } - private void processPipelineReport(PipelineReport report, DatanodeDetails dn) - throws IOException { + private void processPipelineReport(PipelineReport report, DatanodeDetails dn, + EventPublisher publisher) throws IOException { PipelineID pipelineID = PipelineID.getFromProtobuf(report.getPipelineID()); Pipeline pipeline; try { pipeline = pipelineManager.getPipeline(pipelineID); } catch (PipelineNotFoundException e) { - RatisPipelineUtils.destroyPipeline(dn, pipelineID, conf, - pipelineManager.getGrpcTlsConfig()); + final ClosePipelineCommand closeCommand = + new ClosePipelineCommand(pipelineID); + final CommandForDatanode datanodeCommand = + new CommandForDatanode<>(dn.getUuid(), closeCommand); + publisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand); return; } @@ -108,6 +104,10 @@ public class PipelineReportHandler implements if (pipeline.isHealthy()) { // if all the dns have reported, pipeline can be moved to OPEN state pipelineManager.openPipeline(pipelineID); + } else { + if (pipelineAvailabilityCheck && scmSafeModeManager.getInSafeMode()) { + publisher.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline); + } } } else { // In OPEN state case just report the datanode diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java index 7615057..93fbbd1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java @@ -131,9 +131,9 @@ class PipelineStateManager { throw new IOException("Closed pipeline can not be opened"); } if (pipeline.getPipelineState() == PipelineState.ALLOCATED) { + LOG.info("Pipeline {} moved to OPEN state", pipeline.toString()); pipeline = pipelineStateMap .updatePipelineState(pipelineId, PipelineState.OPEN); - LOG.info("Pipeline {} moved to OPEN state", pipeline.toString()); } return pipeline; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index 9409728..b9aff86 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -24,37 +24,29 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.PlacementPolicy; -import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom; +import org.apache.hadoop.hdds.scm.container.placement.algorithms + .SCMContainerPlacementRandom; +import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState; -import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.hdds.ratis.RatisHelper; -import org.apache.ratis.client.RaftClient; -import org.apache.ratis.grpc.GrpcTlsConfig; -import org.apache.ratis.protocol.RaftClientReply; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand; +import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; +import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand; import org.apache.ratis.protocol.RaftGroup; -import org.apache.ratis.protocol.RaftPeer; -import org.apache.ratis.retry.RetryPolicy; -import org.apache.ratis.rpc.SupportedRpcType; -import org.apache.ratis.util.TimeDuration; -import org.apache.ratis.util.function.CheckedBiConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinWorkerThread; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -69,6 +61,7 @@ public class RatisPipelineProvider implements PipelineProvider { private final NodeManager nodeManager; private final PipelineStateManager stateManager; private final Configuration conf; + private final EventPublisher eventPublisher; // Set parallelism at 3, as now in Ratis we create 1 and 3 node pipelines. private final int parallelismForPool = 3; @@ -83,15 +76,14 @@ public class RatisPipelineProvider implements PipelineProvider { private final ForkJoinPool forkJoinPool = new ForkJoinPool( parallelismForPool, factory, null, false); - private final GrpcTlsConfig tlsConfig; RatisPipelineProvider(NodeManager nodeManager, PipelineStateManager stateManager, Configuration conf, - GrpcTlsConfig tlsConfig) { + EventPublisher eventPublisher) { this.nodeManager = nodeManager; this.stateManager = stateManager; this.conf = conf; - this.tlsConfig = tlsConfig; + this.eventPublisher = eventPublisher; } @@ -155,12 +147,25 @@ public class RatisPipelineProvider implements PipelineProvider { Pipeline pipeline = Pipeline.newBuilder() .setId(PipelineID.randomId()) - .setState(PipelineState.OPEN) + .setState(PipelineState.ALLOCATED) .setType(ReplicationType.RATIS) .setFactor(factor) .setNodes(dns) .build(); - initializePipeline(pipeline); + + // Send command to datanode to create pipeline + final CreatePipelineCommand createCommand = + new CreatePipelineCommand(pipeline.getId(), pipeline.getType(), + factor, dns); + + dns.stream().forEach(node -> { + final CommandForDatanode datanodeCommand = + new CommandForDatanode<>(node.getUuid(), createCommand); + LOG.info("Send pipeline:{} create command to datanode {}", + pipeline.getId(), datanodeCommand.getDatanodeId()); + eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand); + }); + return pipeline; } @@ -188,67 +193,24 @@ public class RatisPipelineProvider implements PipelineProvider { } } - protected void initializePipeline(Pipeline pipeline) throws IOException { + /** + * Removes pipeline from SCM. Sends command to destroy pipeline on all + * the datanodes. + * + * @param pipeline - Pipeline to be destroyed + * @throws IOException + */ + public void close(Pipeline pipeline) { final RaftGroup group = RatisHelper.newRaftGroup(pipeline); - LOG.debug("creating pipeline:{} with {}", pipeline.getId(), group); - callRatisRpc(pipeline.getNodes(), - (raftClient, peer) -> { - RaftClientReply reply = raftClient.groupAdd(group, peer.getId()); - if (reply == null || !reply.isSuccess()) { - String msg = "Pipeline initialization failed for pipeline:" - + pipeline.getId() + " node:" + peer.getId(); - LOG.error(msg); - throw new IOException(msg); - } - }); - } - - private void callRatisRpc(List<DatanodeDetails> datanodes, - CheckedBiConsumer< RaftClient, RaftPeer, IOException> rpc) - throws IOException { - if (datanodes.isEmpty()) { - return; - } - - final String rpcType = conf - .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, - ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); - final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(conf); - final List< IOException > exceptions = - Collections.synchronizedList(new ArrayList<>()); - final int maxOutstandingRequests = - HddsClientUtils.getMaxOutstandingRequests(conf); - final TimeDuration requestTimeout = - RatisHelper.getClientRequestTimeout(conf); - try { - forkJoinPool.submit(() -> { - datanodes.parallelStream().forEach(d -> { - final RaftPeer p = RatisHelper.toRaftPeer(d); - try (RaftClient client = RatisHelper - .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p, - retryPolicy, maxOutstandingRequests, tlsConfig, - requestTimeout)) { - rpc.accept(client, p); - } catch (IOException ioe) { - String errMsg = - "Failed invoke Ratis rpc " + rpc + " for " + d.getUuid(); - LOG.error(errMsg, ioe); - exceptions.add(new IOException(errMsg, ioe)); - } - }); - }).get(); - } catch (ExecutionException | RejectedExecutionException ex) { - LOG.error(ex.getClass().getName() + " exception occurred during " + - "createPipeline", ex); - throw new IOException(ex.getClass().getName() + " exception occurred " + - "during createPipeline", ex); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupt exception occurred during " + - "createPipeline", ex); - } - if (!exceptions.isEmpty()) { - throw MultipleIOException.createIOException(exceptions); - } + LOG.debug("Destroy pipeline:{} with {} ", pipeline.getId(), group); + final ClosePipelineCommand closeCommand = + new ClosePipelineCommand(pipeline.getId()); + pipeline.getNodes().stream().forEach(node -> { + final CommandForDatanode datanodeCommand = + new CommandForDatanode<>(node.getUuid(), closeCommand); + LOG.info("Send pipeline:{} close command to datanode {}", + pipeline.getId(), datanodeCommand.getDatanodeId()); + eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand); + }); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java deleted file mode 100644 index 777a0b0..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.scm.pipeline; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdds.scm.client.HddsClientUtils; -import org.apache.hadoop.hdds.ratis.RatisHelper; -import org.apache.ratis.client.RaftClient; -import org.apache.ratis.grpc.GrpcTlsConfig; -import org.apache.ratis.protocol.RaftGroup; -import org.apache.ratis.protocol.RaftGroupId; -import org.apache.ratis.protocol.RaftPeer; -import org.apache.ratis.retry.RetryPolicy; -import org.apache.ratis.rpc.SupportedRpcType; -import org.apache.ratis.util.TimeDuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Utility class for Ratis pipelines. Contains methods to create and destroy - * ratis pipelines. - */ -public final class RatisPipelineUtils { - - private static final Logger LOG = - LoggerFactory.getLogger(RatisPipelineUtils.class); - - private RatisPipelineUtils() { - } - /** - * Removes pipeline from SCM. Sends ratis command to destroy pipeline on all - * the datanodes. - * - * @param pipeline - Pipeline to be destroyed - * @param ozoneConf - Ozone configuration - * @param grpcTlsConfig - * @throws IOException - */ - static void destroyPipeline(Pipeline pipeline, Configuration ozoneConf, - GrpcTlsConfig grpcTlsConfig) { - final RaftGroup group = RatisHelper.newRaftGroup(pipeline); - LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group); - for (DatanodeDetails dn : pipeline.getNodes()) { - try { - destroyPipeline(dn, pipeline.getId(), ozoneConf, grpcTlsConfig); - } catch (IOException e) { - LOG.warn("Pipeline destroy failed for pipeline={} dn={}", - pipeline.getId(), dn); - } - } - } - - /** - * Sends ratis command to destroy pipeline on the given datanode. - * - * @param dn - Datanode on which pipeline needs to be destroyed - * @param pipelineID - ID of pipeline to be destroyed - * @param ozoneConf - Ozone configuration - * @param grpcTlsConfig - grpc tls configuration - * @throws IOException - */ - static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID, - Configuration ozoneConf, GrpcTlsConfig grpcTlsConfig) throws IOException { - final String rpcType = ozoneConf - .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, - ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); - final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf); - final RaftPeer p = RatisHelper.toRaftPeer(dn); - final int maxOutstandingRequests = - HddsClientUtils.getMaxOutstandingRequests(ozoneConf); - final TimeDuration requestTimeout = - RatisHelper.getClientRequestTimeout(ozoneConf); - try(RaftClient client = RatisHelper - .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p, - retryPolicy, maxOutstandingRequests, grpcTlsConfig, - requestTimeout)) { - client.groupRemove(RaftGroupId.valueOf(pipelineID.getId()), - true, p.getId()); - } - } -} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index 0964f6d..405baa8 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -21,23 +21,30 @@ package org.apache.hadoop.hdds.scm.pipeline; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.CreatePipelineACKProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.CommandStatus; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.server.ServerUtils; import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler + .CreatePipelineStatus; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.hdds.utils.MetadataKeyFilters; import org.apache.hadoop.hdds.utils.MetadataStore; import org.apache.hadoop.hdds.utils.MetadataStoreBuilder; import org.apache.hadoop.hdds.utils.Scheduler; -import org.apache.ratis.grpc.GrpcTlsConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,18 +88,20 @@ public class SCMPipelineManager implements PipelineManager { private final NodeManager nodeManager; private final SCMPipelineMetrics metrics; private final Configuration conf; + private final StorageContainerManager scm; + private boolean pipelineAvailabilityCheck; + private boolean createPipelineInSafemode; // Pipeline Manager MXBean private ObjectName pmInfoBean; - private GrpcTlsConfig grpcTlsConfig; public SCMPipelineManager(Configuration conf, NodeManager nodeManager, - EventPublisher eventPublisher, GrpcTlsConfig grpcTlsConfig) + EventPublisher eventPublisher, StorageContainerManager scm) throws IOException { this.lock = new ReentrantReadWriteLock(); this.conf = conf; this.stateManager = new PipelineStateManager(conf); this.pipelineFactory = new PipelineFactory(nodeManager, stateManager, - conf, grpcTlsConfig); + conf, eventPublisher); // TODO: See if thread priority needs to be set for these threads scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1); this.backgroundPipelineCreator = @@ -113,8 +122,14 @@ public class SCMPipelineManager implements PipelineManager { this.metrics = SCMPipelineMetrics.create(); this.pmInfoBean = MBeans.register("SCMPipelineManager", "SCMPipelineManagerInfo", this); + this.scm = scm; + this.pipelineAvailabilityCheck = conf.getBoolean( + HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, + HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT); + this.createPipelineInSafemode = conf.getBoolean( + HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, + HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT); initializePipelineState(); - this.grpcTlsConfig = grpcTlsConfig; } public PipelineStateManager getStateManager() { @@ -130,6 +145,9 @@ public class SCMPipelineManager implements PipelineManager { private void initializePipelineState() throws IOException { if (pipelineStore.isEmpty()) { LOG.info("No pipeline exists in current db"); + if (pipelineAvailabilityCheck && createPipelineInSafemode) { + startPipelineCreator(); + } return; } List<Map.Entry<byte[], byte[]>> pipelines = @@ -148,8 +166,8 @@ public class SCMPipelineManager implements PipelineManager { } @Override - public synchronized Pipeline createPipeline( - ReplicationType type, ReplicationFactor factor) throws IOException { + public synchronized Pipeline createPipeline(ReplicationType type, + ReplicationFactor factor) throws IOException { lock.writeLock().lock(); try { Pipeline pipeline = pipelineFactory.create(type, factor); @@ -157,8 +175,11 @@ public class SCMPipelineManager implements PipelineManager { pipeline.getProtobufMessage().toByteArray()); stateManager.addPipeline(pipeline); nodeManager.addPipeline(pipeline); - metrics.incNumPipelineCreated(); - metrics.createPerPipelineMetrics(pipeline); + metrics.incNumPipelineAllocated(); + if (pipeline.isOpen()) { + metrics.incNumPipelineCreated(); + metrics.createPerPipelineMetrics(pipeline); + } return pipeline; } catch (InsufficientDatanodesException idEx) { throw idEx; @@ -225,6 +246,16 @@ public class SCMPipelineManager implements PipelineManager { } } + public List<Pipeline> getPipelines(ReplicationType type, + Pipeline.PipelineState state) { + lock.readLock().lock(); + try { + return stateManager.getPipelines(type, state); + } finally { + lock.readLock().unlock(); + } + } + @Override public List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor, Pipeline.PipelineState state) { @@ -293,7 +324,9 @@ public class SCMPipelineManager implements PipelineManager { lock.writeLock().lock(); try { Pipeline pipeline = stateManager.openPipeline(pipelineId); - metrics.createPerPipelineMetrics(pipeline); + if (pipelineAvailabilityCheck && scm != null && scm.isInSafeMode()) { + eventPublisher.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline); + } } finally { lock.writeLock().unlock(); } @@ -408,7 +441,7 @@ public class SCMPipelineManager implements PipelineManager { * @throws IOException */ private void destroyPipeline(Pipeline pipeline) throws IOException { - RatisPipelineUtils.destroyPipeline(pipeline, conf, grpcTlsConfig); + pipelineFactory.close(pipeline.getType(), pipeline); // remove the pipeline from the pipeline manager removePipeline(pipeline.getId()); triggerPipelineCreation(); @@ -441,11 +474,6 @@ public class SCMPipelineManager implements PipelineManager { } @Override - public GrpcTlsConfig getGrpcTlsConfig() { - return grpcTlsConfig; - } - - @Override public void close() throws IOException { if (scheduler != null) { scheduler.close(); @@ -466,4 +494,63 @@ public class SCMPipelineManager implements PipelineManager { // shutdown pipeline provider. pipelineFactory.shutdown(); } + + @Override + public void onMessage(CreatePipelineStatus response, + EventPublisher publisher) { + CommandStatus result = response.getCmdStatus(); + CreatePipelineACKProto ack = result.getCreatePipelineAck(); + PipelineID pipelineID = PipelineID.getFromProtobuf(ack.getPipelineID()); + CommandStatus.Status status = result.getStatus(); + LOG.info("receive pipeline {} create status {}", pipelineID, status); + Pipeline pipeline; + try { + pipeline = stateManager.getPipeline(pipelineID); + } catch (PipelineNotFoundException e) { + LOG.warn("Pipeline {} cannot be found", pipelineID); + return; + } + + switch (status) { + case EXECUTED: + DatanodeDetails dn = nodeManager.getNodeByUuid(ack.getDatanodeUUID()); + if (dn == null) { + LOG.warn("Datanode {} for Pipeline {} cannot be found", + ack.getDatanodeUUID(), pipelineID); + return; + } + try { + pipeline.reportDatanode(dn); + } catch (IOException e) { + LOG.warn("Update {} for Pipeline {} failed for {}", + dn.getUuidString(), pipelineID, e.getMessage()); + return; + } + // If all datanodes are updated, we believe pipeline is ready to OPEN + if (pipeline.isHealthy() || + pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) { + try { + openPipeline(pipelineID); + } catch (IOException e) { + LOG.warn("Fail to open Pipeline {} for {}", pipelineID, + e.getMessage()); + return; + } + metrics.incNumPipelineCreated(); + metrics.createPerPipelineMetrics(pipeline); + } + return; + case FAILED: + metrics.incNumPipelineCreationFailed(); + try { + finalizeAndDestroyPipeline(pipeline, false); + } catch (IOException e) { + LOG.warn("Fail to close Pipeline {} for {}", pipelineID, + e.getMessage()); + } + return; + default: + LOG.error("Unknown or unexpected status {}", status); + } + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java index d0f7f6e..fa91572 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java @@ -46,6 +46,7 @@ public final class SCMPipelineMetrics implements MetricsSource { private MetricsRegistry registry; + private @Metric MutableCounterLong numPipelineAllocated; private @Metric MutableCounterLong numPipelineCreated; private @Metric MutableCounterLong numPipelineCreationFailed; private @Metric MutableCounterLong numPipelineDestroyed; @@ -83,6 +84,7 @@ public final class SCMPipelineMetrics implements MetricsSource { @SuppressWarnings("SuspiciousMethodCalls") public void getMetrics(MetricsCollector collector, boolean all) { MetricsRecordBuilder recordBuilder = collector.addRecord(SOURCE_NAME); + numPipelineAllocated.snapshot(recordBuilder, true); numPipelineCreated.snapshot(recordBuilder, true); numPipelineCreationFailed.snapshot(recordBuilder, true); numPipelineDestroyed.snapshot(recordBuilder, true); @@ -94,6 +96,7 @@ public final class SCMPipelineMetrics implements MetricsSource { } void createPerPipelineMetrics(Pipeline pipeline) { + System.out.println("add pipeline " + pipeline.getId() + " to metrics map"); numBlocksAllocated.put(pipeline.getId(), new MutableCounterLong(Interns .info(getBlockAllocationMetricName(pipeline), "Number of blocks allocated in pipeline " + pipeline.getId()), 0L)); @@ -117,6 +120,14 @@ public final class SCMPipelineMetrics implements MetricsSource { } /** + * Increments number of pipeline allocation count, including succeeded + * and failed. + */ + void incNumPipelineAllocated() { + numPipelineAllocated.incr(); + } + + /** * Increments number of successful pipeline creation count. */ void incNumPipelineCreated() { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java index 54e2141..a772a97 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java @@ -74,6 +74,11 @@ public class SimplePipelineProvider implements PipelineProvider { } @Override + public void close(Pipeline pipeline) throws IOException { + + } + + @Override public void shutdown() { // Do nothing. } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java index 7a00d76..5304270 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java @@ -20,17 +20,10 @@ package org.apache.hadoop.hdds.scm.safemode; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.HddsConfigKeys; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; -import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; -import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; - import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.server.events.EventQueue; @@ -38,9 +31,6 @@ import org.apache.hadoop.hdds.server.events.TypedEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashSet; -import java.util.Set; - /** * Class defining Safe mode exit criteria for Pipelines. * @@ -49,43 +39,52 @@ import java.util.Set; * through in a cluster. */ public class HealthyPipelineSafeModeRule - extends SafeModeExitRule<PipelineReportFromDatanode>{ + extends SafeModeExitRule<Pipeline>{ public static final Logger LOG = LoggerFactory.getLogger(HealthyPipelineSafeModeRule.class); - private final PipelineManager pipelineManager; private final int healthyPipelineThresholdCount; private int currentHealthyPipelineCount = 0; - private final Set<DatanodeDetails> processedDatanodeDetails = - new HashSet<>(); HealthyPipelineSafeModeRule(String ruleName, EventQueue eventQueue, PipelineManager pipelineManager, SCMSafeModeManager manager, Configuration configuration) { super(manager, ruleName, eventQueue); - this.pipelineManager = pipelineManager; double healthyPipelinesPercent = configuration.getDouble(HddsConfigKeys. HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT, HddsConfigKeys. HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT_DEFAULT); + int minHealthyPipelines = 0; + + boolean createPipelineInSafemode = configuration.getBoolean( + HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, + HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT); + + if (createPipelineInSafemode) { + minHealthyPipelines = + configuration.getInt(HddsConfigKeys.HDDS_SCM_SAFEMODE_MIN_PIPELINE, + HddsConfigKeys.HDDS_SCM_SAFEMODE_MIN_PIPELINE_DEFAULT); + } + Preconditions.checkArgument( (healthyPipelinesPercent >= 0.0 && healthyPipelinesPercent <= 1.0), HddsConfigKeys. HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT + " value should be >= 0.0 and <= 1.0"); - // As we want to wait for 3 node pipelines - int pipelineCount = + // As we want to wait for RATIS write pipelines, no matter ONE or THREE + int pipelineCount = pipelineManager.getPipelines( + HddsProtos.ReplicationType.RATIS, Pipeline.PipelineState.OPEN).size() + pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS, - HddsProtos.ReplicationFactor.THREE).size(); + Pipeline.PipelineState.ALLOCATED).size(); // This value will be zero when pipeline count is 0. // On a fresh installed cluster, there will be zero pipelines in the SCM // pipeline DB. - healthyPipelineThresholdCount = - (int) Math.ceil(healthyPipelinesPercent * pipelineCount); + healthyPipelineThresholdCount = Math.max(minHealthyPipelines, + (int) Math.ceil(healthyPipelinesPercent * pipelineCount)); LOG.info(" Total pipeline count is {}, healthy pipeline " + "threshold count is {}", pipelineCount, healthyPipelineThresholdCount); @@ -95,67 +94,41 @@ public class HealthyPipelineSafeModeRule } @Override - protected TypedEvent<PipelineReportFromDatanode> getEventType() { - return SCMEvents.PROCESSED_PIPELINE_REPORT; + protected TypedEvent<Pipeline> getEventType() { + return SCMEvents.OPEN_PIPELINE; } @Override protected boolean validate() { if (currentHealthyPipelineCount >= healthyPipelineThresholdCount) { + LOG.info("{} rule satisfied", this.getClass().getSimpleName()); return true; } return false; } @Override - protected void process(PipelineReportFromDatanode - pipelineReportFromDatanode) { + protected void process(Pipeline pipeline) { // When SCM is in safe mode for long time, already registered - // datanode can send pipeline report again, then pipeline handler fires - // processed report event, we should not consider this pipeline report - // from datanode again during threshold calculation. - Preconditions.checkNotNull(pipelineReportFromDatanode); - DatanodeDetails dnDetails = pipelineReportFromDatanode.getDatanodeDetails(); - if (!processedDatanodeDetails.contains( - pipelineReportFromDatanode.getDatanodeDetails())) { - - Pipeline pipeline; - PipelineReportsProto pipelineReport = - pipelineReportFromDatanode.getReport(); - - for (PipelineReport report : pipelineReport.getPipelineReportList()) { - PipelineID pipelineID = PipelineID - .getFromProtobuf(report.getPipelineID()); - try { - pipeline = pipelineManager.getPipeline(pipelineID); - } catch (PipelineNotFoundException e) { - continue; - } - - if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE && - pipeline.getPipelineState() == Pipeline.PipelineState.OPEN) { - // If the pipeline is open state mean, all 3 datanodes are reported - // for this pipeline. - currentHealthyPipelineCount++; - getSafeModeMetrics().incCurrentHealthyPipelinesCount(); - } - } - if (scmInSafeMode()) { - SCMSafeModeManager.getLogger().info( - "SCM in safe mode. Healthy pipelines reported count is {}, " + - "required healthy pipeline reported count is {}", - currentHealthyPipelineCount, healthyPipelineThresholdCount); - } - - processedDatanodeDetails.add(dnDetails); + // datanode can send pipeline report again, or SCMPipelineManager will + // create new pipelines. + Preconditions.checkNotNull(pipeline); + if (pipeline.getType() == HddsProtos.ReplicationType.RATIS) { + currentHealthyPipelineCount++; + getSafeModeMetrics().incCurrentHealthyPipelinesCount(); } + if (scmInSafeMode()) { + SCMSafeModeManager.getLogger().info( + "SCM in safe mode. Healthy pipelines reported count is {}, " + + "required healthy pipeline reported count is {}", + currentHealthyPipelineCount, healthyPipelineThresholdCount); + } } @Override protected void cleanup() { - processedDatanodeDetails.clear(); } @VisibleForTesting diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java index 841d8ff..34ba35c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java @@ -22,17 +22,10 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineReport; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; -import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; -import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher. - PipelineReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.hdds.server.events.TypedEvent; import org.slf4j.Logger; @@ -47,14 +40,13 @@ import java.util.Set; * replica available for read when we exit safe mode. */ public class OneReplicaPipelineSafeModeRule extends - SafeModeExitRule<PipelineReportFromDatanode> { + SafeModeExitRule<Pipeline> { private static final Logger LOG = LoggerFactory.getLogger(OneReplicaPipelineSafeModeRule.class); private int thresholdCount; private Set<PipelineID> reportedPipelineIDSet = new HashSet<>(); - private final PipelineManager pipelineManager; private int currentReportedPipelineCount = 0; @@ -62,7 +54,6 @@ public class OneReplicaPipelineSafeModeRule extends PipelineManager pipelineManager, SCMSafeModeManager safeModeManager, Configuration configuration) { super(safeModeManager, ruleName, eventQueue); - this.pipelineManager = pipelineManager; double percent = configuration.getDouble( @@ -75,69 +66,59 @@ public class OneReplicaPipelineSafeModeRule extends HDDS_SCM_SAFEMODE_ONE_NODE_REPORTED_PIPELINE_PCT + " value should be >= 0.0 and <= 1.0"); + // Exclude CLOSED pipeline int totalPipelineCount = pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS, - HddsProtos.ReplicationFactor.THREE).size(); + HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN) + .size() + + pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, + Pipeline.PipelineState.ALLOCATED).size(); thresholdCount = (int) Math.ceil(percent * totalPipelineCount); - LOG.info(" Total pipeline count is {}, pipeline's with atleast one " + + LOG.info("Total pipeline count is {}, pipeline's with at least one " + "datanode reported threshold count is {}", totalPipelineCount, thresholdCount); getSafeModeMetrics().setNumPipelinesWithAtleastOneReplicaReportedThreshold( thresholdCount); - } @Override - protected TypedEvent<PipelineReportFromDatanode> getEventType() { - return SCMEvents.PROCESSED_PIPELINE_REPORT; + protected TypedEvent<Pipeline> getEventType() { + return SCMEvents.OPEN_PIPELINE; } @Override protected boolean validate() { if (currentReportedPipelineCount >= thresholdCount) { + LOG.info("{} rule satisfied", this.getClass().getSimpleName()); return true; } return false; } @Override - protected void process(PipelineReportFromDatanode - pipelineReportFromDatanode) { - Pipeline pipeline; - Preconditions.checkNotNull(pipelineReportFromDatanode); - PipelineReportsProto pipelineReport = - pipelineReportFromDatanode.getReport(); - - for (PipelineReport report : pipelineReport.getPipelineReportList()) { - PipelineID pipelineID = PipelineID - .getFromProtobuf(report.getPipelineID()); - try { - pipeline = pipelineManager.getPipeline(pipelineID); - } catch (PipelineNotFoundException e) { - continue; - } - - if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE && - !reportedPipelineIDSet.contains(pipelineID)) { - reportedPipelineIDSet.add(pipelineID); - getSafeModeMetrics() - .incCurrentHealthyPipelinesWithAtleastOneReplicaReportedCount(); - } + protected void process(Pipeline pipeline) { + Preconditions.checkNotNull(pipeline); + if (pipeline.getType() == HddsProtos.ReplicationType.RATIS && + pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE && + !reportedPipelineIDSet.contains(pipeline.getId())) { + reportedPipelineIDSet.add(pipeline.getId()); + getSafeModeMetrics() + .incCurrentHealthyPipelinesWithAtleastOneReplicaReportedCount(); } currentReportedPipelineCount = reportedPipelineIDSet.size(); if (scmInSafeMode()) { SCMSafeModeManager.getLogger().info( - "SCM in safe mode. Pipelines with atleast one datanode reported " + - "count is {}, required atleast one datanode reported per " + + "SCM in safe mode. Pipelines with at least one datanode reported " + + "count is {}, required at least one datanode reported per " + "pipeline count is {}", currentReportedPipelineCount, thresholdCount); } - } @Override @@ -154,5 +135,4 @@ public class OneReplicaPipelineSafeModeRule extends public int getCurrentReportedPipelineCount() { return currentReportedPipelineCount; } - -} +} \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java index a22d162..1bad4cb 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java @@ -59,17 +59,17 @@ import org.slf4j.LoggerFactory; * number of datanode registered is met or not. * * 3. HealthyPipelineSafeModeRule: - * Once the pipelineReportHandler processes the - * {@link SCMEvents#PIPELINE_REPORT}, it fires - * {@link SCMEvents#PROCESSED_PIPELINE_REPORT}. This rule handles this + * Once the SCMPipelineManager processes the + * {@link SCMEvents#CREATE_PIPELINE_STATUS}, it fires + * {@link SCMEvents#OPEN_PIPELINE}. This rule handles this * event. This rule processes this report, and check if pipeline is healthy * and increments current healthy pipeline count. Then validate it cutoff * threshold for healthy pipeline is met or not. * * 4. OneReplicaPipelineSafeModeRule: - * Once the pipelineReportHandler processes the - * {@link SCMEvents#PIPELINE_REPORT}, it fires - * {@link SCMEvents#PROCESSED_PIPELINE_REPORT}. This rule handles this + * Once the SCMPipelineManager processes the + * {@link SCMEvents#CREATE_PIPELINE_STATUS}, it fires + * {@link SCMEvents#OPEN_PIPELINE}. This rule handles this * event. This rule processes this report, and add the reported pipeline to * reported pipeline set. Then validate it cutoff threshold for one replica * per pipeline is met or not. @@ -166,6 +166,7 @@ public class SCMSafeModeManager { if (exitRules.get(ruleName) != null) { validatedRules.add(ruleName); + LOG.info("{} rule is successfully validated", ruleName); } else { // This should never happen LOG.error("No Such Exit rule {}", ruleName); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java index b9e5333..6128266 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java @@ -129,7 +129,8 @@ public class SafeModeHandler implements EventHandler<SafeModeStatus> { List<Pipeline> pipelineList = scmPipelineManager.getPipelines(); pipelineList.forEach((pipeline) -> { try { - if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) { + if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED && + pipeline.isAllocationTimeout()) { scmPipelineManager.finalizeAndDestroyPipeline(pipeline, false); } } catch (IOException ex) { @@ -142,6 +143,4 @@ public class SafeModeHandler implements EventHandler<SafeModeStatus> { public boolean getSafeModeStatus() { return isInSafeMode.get(); } - - } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java index 9f6077b..3dbb4cb 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java @@ -164,6 +164,7 @@ public final class SCMDatanodeHeartbeatDispatcher { } if (heartbeat.getCommandStatusReportsCount() != 0) { + LOG.debug("Dispatching Command Status Report."); for (CommandStatusReportsProto commandStatusReport : heartbeat .getCommandStatusReportsList()) { eventPublisher.fireEvent( diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java index 6dd9dab..11c1a44 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java @@ -57,6 +57,12 @@ import static org.apache.hadoop.hdds.protocol.proto .Type.closeContainerCommand; import static org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMCommandProto + .Type.closePipelineCommand; +import static org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto + .Type.createPipelineCommand; +import static org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto .Type.deleteBlocksCommand; import static org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMCommandProto @@ -90,6 +96,8 @@ import org.apache.hadoop.ozone.audit.Auditor; import org.apache.hadoop.ozone.audit.SCMAction; import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; +import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand; +import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand; import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand; import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; @@ -350,6 +358,18 @@ public class SCMDatanodeProtocolServer implements .setReplicateContainerCommandProto( ((ReplicateContainerCommand)cmd).getProto()) .build(); + case createPipelineCommand: + return builder + .setCommandType(createPipelineCommand) + .setCreatePipelineCommandProto( + ((CreatePipelineCommand)cmd).getProto()) + .build(); + case closePipelineCommand: + return builder + .setCommandType(closePipelineCommand) + .setClosePipelineCommandProto( + ((ClosePipelineCommand)cmd).getProto()) + .build(); default: throw new IllegalArgumentException("Scm command " + cmd.getType().toString() + " is not implemented"); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index c25b3a0..7c2cffa 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -356,6 +356,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionHandler); eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler); eventQueue.addHandler(SCMEvents.SAFE_MODE_STATUS, safeModeHandler); + eventQueue.addHandler(SCMEvents.CREATE_PIPELINE_STATUS, pipelineManager); registerMXBean(); registerMetricsSource(this); } @@ -402,8 +403,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl pipelineManager = configurator.getPipelineManager(); } else { pipelineManager = - new SCMPipelineManager(conf, scmNodeManager, eventQueue, - grpcTlsConfig); + new SCMPipelineManager(conf, scmNodeManager, eventQueue, this); } if (configurator.getContainerManager() != null) { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java index 37321d7..46e8a51 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java @@ -33,7 +33,9 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.scm.server.SCMConfigurator; import org.apache.hadoop.hdds.scm.server .SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode; @@ -41,6 +43,12 @@ import org.apache.hadoop.hdds.scm.server .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler + .CreatePipelineStatus; +import org.apache.hadoop.hdds.protocol.proto. + StorageContainerDatanodeProtocolProtos.CreatePipelineACKProto; +import org.apache.hadoop.hdds.protocol.proto. + StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto @@ -360,6 +368,32 @@ public final class TestUtils { return new PipelineReportFromDatanode(dn, reportBuilder.build()); } + public static CreatePipelineStatus getPipelineCreateStatusFromDatanode( + DatanodeDetails dn, PipelineID pipelineID) { + CreatePipelineACKProto ack = CreatePipelineACKProto.newBuilder() + .setPipelineID(pipelineID.getProtobuf()) + .setDatanodeUUID(dn.getUuidString()) + .build(); + CommandStatus status = CommandStatus.newBuilder() + .setCreatePipelineAck(ack) + .setStatus(CommandStatus.Status.EXECUTED) + .setCmdId(0L) + .setType(Type.createPipelineCommand) + .build(); + return new CreatePipelineStatus(status); + } + + public static void openAllRatisPipelines(PipelineManager pipelineManager) + throws IOException { + // Pipeline is created by background thread + List<Pipeline> pipelines = + pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS); + // Trigger the processed pipeline report event + for (Pipeline pipeline : pipelines) { + pipelineManager.openPipeline(pipeline.getId()); + } + } + public static PipelineActionsFromDatanode getPipelineActionFromDatanode( DatanodeDetails dn, PipelineID... pipelineIDs) { PipelineActionsProto.Builder actionsProtoBuilder = diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java index a8364a4..ba3d4f4 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider; import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider; import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager; @@ -80,6 +81,8 @@ public class TestCloseContainerEventHandler { eventQueue.addHandler(CLOSE_CONTAINER, new CloseContainerEventHandler(pipelineManager, containerManager)); eventQueue.addHandler(DATANODE_COMMAND, nodeManager); + // Move all pipelines created by background from ALLOCATED to OPEN state + TestUtils.openAllRatisPipelines(pipelineManager); } @AfterClass diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java index 7657b54..14c24e0 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java @@ -73,6 +73,7 @@ public class TestDeadNodeHandler { private SCMNodeManager nodeManager; private ContainerManager containerManager; private NodeReportHandler nodeReportHandler; + private SCMPipelineManager pipelineManager; private DeadNodeHandler deadNodeHandler; private EventPublisher publisher; private EventQueue eventQueue; @@ -87,12 +88,12 @@ public class TestDeadNodeHandler { eventQueue = new EventQueue(); scm = HddsTestUtils.getScm(conf); nodeManager = (SCMNodeManager) scm.getScmNodeManager(); - SCMPipelineManager manager = + pipelineManager = (SCMPipelineManager)scm.getPipelineManager(); PipelineProvider mockRatisProvider = - new MockRatisPipelineProvider(nodeManager, manager.getStateManager(), - conf); - manager.setPipelineProvider(HddsProtos.ReplicationType.RATIS, + new MockRatisPipelineProvider(nodeManager, + pipelineManager.getStateManager(), conf); + pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS, mockRatisProvider); containerManager = scm.getContainerManager(); deadNodeHandler = new DeadNodeHandler(nodeManager, @@ -147,6 +148,8 @@ public class TestDeadNodeHandler { nodeManager.register(TestUtils.randomDatanodeDetails(), TestUtils.createNodeReport(storageOne), null); + TestUtils.openAllRatisPipelines(pipelineManager); + ContainerInfo container1 = TestUtils.allocateContainer(containerManager); ContainerInfo container2 = diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java index d028851..dd61102 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java @@ -121,6 +121,7 @@ public class TestSCMNodeManager { testDir.getAbsolutePath()); conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100, TimeUnit.MILLISECONDS); + conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false); return conf; } @@ -1035,9 +1036,11 @@ public class TestSCMNodeManager { eq.processAll(1000L); List<SCMCommand> command = nodemanager.processHeartbeat(datanodeDetails); - Assert.assertEquals(1, command.size()); - Assert - .assertEquals(command.get(0).getClass(), CloseContainerCommand.class); + // With dh registered, SCM will send create pipeline command to dn + Assert.assertTrue(command.size() >= 1); + Assert.assertTrue(command.get(0).getClass().equals( + CloseContainerCommand.class) || + command.get(1).getClass().equals(CloseContainerCommand.class)); } catch (IOException e) { e.printStackTrace(); throw e; diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java index 01c53ba..28a3484 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.pipeline; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.server.events.EventQueue; import java.io.IOException; @@ -31,7 +32,7 @@ public class MockRatisPipelineProvider extends RatisPipelineProvider { public MockRatisPipelineProvider(NodeManager nodeManager, PipelineStateManager stateManager, Configuration conf) { - super(nodeManager, stateManager, conf, null); + super(nodeManager, stateManager, conf, new EventQueue()); } protected void initializePipeline(Pipeline pipeline) throws IOException { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java index 94c3039..639fc9a 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java @@ -22,10 +22,6 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto. - StorageContainerDatanodeProtocolProtos.PipelineReport; -import org.apache.hadoop.hdds.protocol.proto. - StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.scm.HddsTestUtils; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.MockNodeManager; @@ -34,7 +30,6 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider; import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider; import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager; -import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; @@ -68,7 +63,8 @@ public class TestHealthyPipelineSafeModeRule { // enable pipeline check config.setBoolean( HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true); - + config.setBoolean( + HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false); SCMPipelineManager pipelineManager = new SCMPipelineManager(config, nodeManager, eventQueue, null); @@ -88,10 +84,8 @@ public class TestHealthyPipelineSafeModeRule { } finally { FileUtil.fullyDelete(new File(storageDir)); } - } - @Test public void testHealthyPipelineSafeModeRuleWithPipelines() throws Exception { @@ -113,7 +107,8 @@ public class TestHealthyPipelineSafeModeRule { // enable pipeline check config.setBoolean( HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true); - + config.setBoolean( + HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false); SCMPipelineManager pipelineManager = new SCMPipelineManager(config, nodeManager, eventQueue, null); @@ -162,7 +157,6 @@ public class TestHealthyPipelineSafeModeRule { } finally { FileUtil.fullyDelete(new File(storageDir)); } - } @@ -188,7 +182,8 @@ public class TestHealthyPipelineSafeModeRule { // enable pipeline check config.setBoolean( HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true); - + config.setBoolean( + HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false); SCMPipelineManager pipelineManager = new SCMPipelineManager(config, nodeManager, eventQueue, null); @@ -217,7 +212,7 @@ public class TestHealthyPipelineSafeModeRule { scmSafeModeManager.getHealthyPipelineSafeModeRule(); - // No datanodes have sent pipelinereport from datanode + // No pipeline event have sent to SCMSafemodeManager Assert.assertFalse(healthyPipelineSafeModeRule.validate()); @@ -225,14 +220,14 @@ public class TestHealthyPipelineSafeModeRule { GenericTestUtils.LogCapturer.captureLogs(LoggerFactory.getLogger( SCMSafeModeManager.class)); - // fire event with pipeline report with ratis type and factor 1 + // fire event with pipeline create status with ratis type and factor 1 // pipeline, validate() should return false firePipelineEvent(pipeline1, eventQueue); GenericTestUtils.waitFor(() -> logCapturer.getOutput().contains( - "reported count is 0"), + "reported count is 1"), 1000, 5000); - Assert.assertFalse(healthyPipelineSafeModeRule.validate()); + Assert.assertTrue(healthyPipelineSafeModeRule.validate()); firePipelineEvent(pipeline2, eventQueue); firePipelineEvent(pipeline3, eventQueue); @@ -246,19 +241,7 @@ public class TestHealthyPipelineSafeModeRule { } - private void firePipelineEvent(Pipeline pipeline, EventQueue eventQueue) { - PipelineReportsProto.Builder reportBuilder = PipelineReportsProto - .newBuilder(); - - reportBuilder.addPipelineReport(PipelineReport.newBuilder() - .setPipelineID(pipeline.getId().getProtobuf())); - - // Here no need to fire event from 3 nodes, as already pipeline is in - // open state, but doing it. - eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT, - new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode( - pipeline.getNodes().get(0), reportBuilder.build())); + eventQueue.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline); } - } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java index ca54d05..7f8f0db 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java @@ -20,10 +20,6 @@ package org.apache.hadoop.hdds.scm.safemode; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineReport; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.scm.HddsTestUtils; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.MockNodeManager; @@ -32,7 +28,6 @@ import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider; import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager; -import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; @@ -63,6 +58,8 @@ public class TestOneReplicaPipelineSafeModeRule { HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true); ozoneConfiguration.set(HddsConfigKeys.OZONE_METADATA_DIRS, folder.newFolder().toString()); + ozoneConfiguration.setBoolean( + HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false); List<ContainerInfo> containers = new ArrayList<>(); containers.addAll(HddsTestUtils.getContainerInfo(1)); @@ -123,7 +120,6 @@ public class TestOneReplicaPipelineSafeModeRule { firePipelineEvent(pipelines.get(pipelineFactorThreeCount - 1)); GenericTestUtils.waitFor(() -> rule.validate(), 1000, 5000); - } @@ -170,11 +166,8 @@ public class TestOneReplicaPipelineSafeModeRule { firePipelineEvent(pipelines.get(pipelineCountThree - 1)); GenericTestUtils.waitFor(() -> rule.validate(), 1000, 5000); - } - - private void createPipelines(int count, HddsProtos.ReplicationFactor factor) throws Exception { for (int i = 0; i < count; i++) { @@ -184,26 +177,6 @@ public class TestOneReplicaPipelineSafeModeRule { } private void firePipelineEvent(Pipeline pipeline) { - PipelineReportsProto.Builder reportBuilder = - PipelineReportsProto.newBuilder(); - - reportBuilder.addPipelineReport(PipelineReport.newBuilder() - .setPipelineID(pipeline.getId().getProtobuf())); - - if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE) { - eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT, - new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode( - pipeline.getNodes().get(0), reportBuilder.build())); - eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT, - new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode( - pipeline.getNodes().get(1), reportBuilder.build())); - eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT, - new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode( - pipeline.getNodes().get(2), reportBuilder.build())); - } else { - eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT, - new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode( - pipeline.getNodes().get(0), reportBuilder.build())); - } + eventQueue.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline); } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java index ba92035..0de0a73 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java @@ -31,8 +31,6 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.scm.HddsTestUtils; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.MockNodeManager; @@ -42,7 +40,6 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider; import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager; -import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; @@ -175,7 +172,7 @@ public class TestSCMSafeModeManager { HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT, healthyPercent); conf.setDouble(HddsConfigKeys. HDDS_SCM_SAFEMODE_ONE_NODE_REPORTED_PIPELINE_PCT, oneReplicaPercent); - + conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false); return conf; } @@ -300,12 +297,12 @@ public class TestSCMSafeModeManager { // we shall a get an event when datanode is registered. In that case, // validate will return true, and add this to validatedRules. if (Math.max(healthyPipelinePercent, oneReplicaThresholdCount) == 0) { - firePipelineEvent(pipelines.get(0)); + firePipelineEvent(pipelineManager, pipelines.get(0)); } for (int i = 0; i < Math.max(healthyPipelineThresholdCount, - oneReplicaThresholdCount); i++) { - firePipelineEvent(pipelines.get(i)); + Math.min(oneReplicaThresholdCount, pipelines.size())); i++) { + firePipelineEvent(pipelineManager, pipelines.get(i)); if (i < healthyPipelineThresholdCount) { checkHealthy(i + 1); @@ -350,15 +347,11 @@ public class TestSCMSafeModeManager { 1000, 5000); } - private void firePipelineEvent(Pipeline pipeline) throws Exception { - PipelineReportsProto.Builder reportBuilder = - PipelineReportsProto.newBuilder(); - - reportBuilder.addPipelineReport(PipelineReport.newBuilder() - .setPipelineID(pipeline.getId().getProtobuf())); - queue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT, - new PipelineReportFromDatanode(pipeline.getNodes().get(0), - reportBuilder.build())); + private void firePipelineEvent(SCMPipelineManager pipelineManager, + Pipeline pipeline) throws Exception { + pipelineManager.openPipeline(pipeline.getId()); + queue.fireEvent(SCMEvents.OPEN_PIPELINE, + pipelineManager.getPipeline(pipeline.getId())); } @@ -488,10 +481,6 @@ public class TestSCMSafeModeManager { Pipeline pipeline = pipelineManager.createPipeline( HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); - PipelineReportsProto.Builder reportBuilder = PipelineReportsProto - .newBuilder(); - reportBuilder.addPipelineReport(PipelineReport.newBuilder() - .setPipelineID(pipeline.getId().getProtobuf())); scmSafeModeManager = new SCMSafeModeManager( config, containers, pipelineManager, queue); @@ -500,10 +489,9 @@ public class TestSCMSafeModeManager { HddsTestUtils.createNodeRegistrationContainerReport(containers)); assertTrue(scmSafeModeManager.getInSafeMode()); - // Trigger the processed pipeline report event - queue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT, - new PipelineReportFromDatanode(pipeline.getNodes().get(0), - reportBuilder.build())); + + + firePipelineEvent(pipelineManager, pipeline); GenericTestUtils.waitFor(() -> { return !scmSafeModeManager.getInSafeMode(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java index e4f1a37..378a1a6 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java @@ -164,7 +164,9 @@ public class TestContainerStateManagerIntegration { } } - cluster.restartStorageContainerManager(true); + // Restart SCM will not trigger container report to satisfy the safe mode + // exit rule. + cluster.restartStorageContainerManager(false); List<ContainerInfo> result = cluster.getStorageContainerManager() .getContainerManager().listContainer(null, 100); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java index c583559..210dbb2 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java @@ -81,7 +81,7 @@ public class TestPipelineClose { cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build(); conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 1000, TimeUnit.MILLISECONDS); - pipelineDestroyTimeoutInMillis = 5000; + pipelineDestroyTimeoutInMillis = 10000; conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS); cluster.waitForClusterToBeReady(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java index 00144e4..dedc56a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.pipeline; import org.apache.commons.collections.CollectionUtils; +import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -45,7 +46,9 @@ public class TestRatisPipelineProvider { @Before public void init() throws Exception { nodeManager = new MockNodeManager(true, 10); - stateManager = new PipelineStateManager(new OzoneConfiguration()); + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false); + stateManager = new PipelineStateManager(conf); provider = new MockRatisPipelineProvider(nodeManager, stateManager, new OzoneConfiguration()); } @@ -57,7 +60,7 @@ public class TestRatisPipelineProvider { Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS); Assert.assertEquals(pipeline.getFactor(), factor); Assert.assertEquals(pipeline.getPipelineState(), - Pipeline.PipelineState.OPEN); + Pipeline.PipelineState.ALLOCATED); Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber()); Pipeline pipeline1 = provider.create(factor); stateManager.addPipeline(pipeline1); @@ -68,7 +71,7 @@ public class TestRatisPipelineProvider { Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS); Assert.assertEquals(pipeline1.getFactor(), factor); Assert.assertEquals(pipeline1.getPipelineState(), - Pipeline.PipelineState.OPEN); + Pipeline.PipelineState.ALLOCATED); Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber()); } @@ -80,7 +83,7 @@ public class TestRatisPipelineProvider { Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS); Assert.assertEquals(pipeline.getFactor(), factor); Assert.assertEquals(pipeline.getPipelineState(), - Pipeline.PipelineState.OPEN); + Pipeline.PipelineState.ALLOCATED); Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber()); factor = HddsProtos.ReplicationFactor.ONE; @@ -94,7 +97,7 @@ public class TestRatisPipelineProvider { Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS); Assert.assertEquals(pipeline1.getFactor(), factor); Assert.assertEquals(pipeline1.getPipelineState(), - Pipeline.PipelineState.OPEN); + Pipeline.PipelineState.ALLOCATED); Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber()); } @@ -183,7 +186,7 @@ public class TestRatisPipelineProvider { Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS); Assert.assertEquals(pipeline.getFactor(), factor); Assert.assertEquals(pipeline.getPipelineState(), - Pipeline.PipelineState.OPEN); + Pipeline.PipelineState.ALLOCATED); Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber()); List<DatanodeDetails> pipelineNodes = pipeline.getNodes(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java index 2a486b1..2e1fe9c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java @@ -28,10 +28,10 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.TestUtils; -import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; +import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler + .CreatePipelineStatus; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.MockNodeManager; -import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.test.GenericTestUtils; @@ -42,7 +42,6 @@ import org.junit.Test; import java.io.File; import java.io.IOException; -import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -61,6 +60,8 @@ public class TestSCMPipelineManager { testDir = GenericTestUtils .getTestDir(TestSCMPipelineManager.class.getSimpleName()); conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath()); + conf.set(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, + "false"); boolean folderExisted = testDir.exists() || testDir.mkdirs(); if (!folderExisted) { throw new IOException("Unable to create test directory path"); @@ -157,30 +158,24 @@ public class TestSCMPipelineManager { pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS, mockRatisProvider); - SCMSafeModeManager scmSafeModeManager = - new SCMSafeModeManager(new OzoneConfiguration(), - new ArrayList<>(), pipelineManager, eventQueue); - // create a pipeline in allocated state with no dns yet reported Pipeline pipeline = pipelineManager .createPipeline(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); + Assert .assertFalse(pipelineManager.getPipeline(pipeline.getId()).isHealthy()); Assert - .assertTrue(pipelineManager.getPipeline(pipeline.getId()).isOpen()); + .assertFalse(pipelineManager.getPipeline(pipeline.getId()).isOpen()); - // get pipeline report from each dn in the pipeline - PipelineReportHandler pipelineReportHandler = - new PipelineReportHandler(scmSafeModeManager, pipelineManager, conf); + // get pipeline create status from each dn in the pipeline for (DatanodeDetails dn: pipeline.getNodes()) { - PipelineReportFromDatanode pipelineReportFromDatanode = - TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId()); // pipeline is not healthy until all dns report Assert.assertFalse( pipelineManager.getPipeline(pipeline.getId()).isHealthy()); - pipelineReportHandler - .onMessage(pipelineReportFromDatanode, new EventQueue()); + CreatePipelineStatus response = + TestUtils.getPipelineCreateStatusFromDatanode(dn, pipeline.getId()); + pipelineManager.onMessage(response, eventQueue); } // pipeline is healthy when all dns report @@ -194,11 +189,10 @@ public class TestSCMPipelineManager { pipelineManager.finalizeAndDestroyPipeline(pipeline, false); for (DatanodeDetails dn: pipeline.getNodes()) { - PipelineReportFromDatanode pipelineReportFromDatanode = - TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId()); - // pipeline report for destroyed pipeline should be ignored - pipelineReportHandler - .onMessage(pipelineReportFromDatanode, new EventQueue()); + CreatePipelineStatus response = + TestUtils.getPipelineCreateStatusFromDatanode(dn, pipeline.getId()); + // pipeline create status for destroyed pipeline should be ignored + pipelineManager.onMessage(response, new EventQueue()); } try { @@ -226,9 +220,9 @@ public class TestSCMPipelineManager { MetricsRecordBuilder metrics = getMetrics( SCMPipelineMetrics.class.getSimpleName()); - long numPipelineCreated = getLongCounter("NumPipelineCreated", + long numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics); - Assert.assertTrue(numPipelineCreated == 0); + Assert.assertTrue(numPipelineAllocated == 0); // 3 DNs are unhealthy. // Create 5 pipelines (Use up 15 Datanodes) @@ -241,8 +235,8 @@ public class TestSCMPipelineManager { metrics = getMetrics( SCMPipelineMetrics.class.getSimpleName()); - numPipelineCreated = getLongCounter("NumPipelineCreated", metrics); - Assert.assertTrue(numPipelineCreated == 5); + numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics); + Assert.assertTrue(numPipelineAllocated == 5); long numPipelineCreateFailed = getLongCounter( "NumPipelineCreationFailed", metrics); @@ -261,8 +255,8 @@ public class TestSCMPipelineManager { metrics = getMetrics( SCMPipelineMetrics.class.getSimpleName()); - numPipelineCreated = getLongCounter("NumPipelineCreated", metrics); - Assert.assertTrue(numPipelineCreated == 5); + numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics); + Assert.assertTrue(numPipelineAllocated == 5); numPipelineCreateFailed = getLongCounter( "NumPipelineCreationFailed", metrics); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index ac76482..9845abb 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -65,6 +65,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState .HEALTHY; @@ -143,11 +144,16 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster { throws TimeoutException, InterruptedException { GenericTestUtils.waitFor(() -> { final int healthy = scm.getNodeCount(HEALTHY); - final boolean isReady = healthy == hddsDatanodes.size(); + final boolean isNodeReady = healthy == hddsDatanodes.size(); + final boolean exitSafeMode = !scm.isInSafeMode(); + LOG.info("{}. Got {} of {} DN Heartbeats.", - isReady? "Cluster is ready" : "Waiting for cluster to be ready", + isNodeReady? "Nodes are ready" : "Waiting for nodes to be ready", + healthy, hddsDatanodes.size()); + LOG.info(exitSafeMode? "Cluster exits safe mode" : + "Waiting for cluster to exit safe mode", healthy, hddsDatanodes.size()); - return isReady; + return isNodeReady && exitSafeMode; }, 1000, waitForClusterToBeReadyTimeout); } @@ -615,11 +621,15 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster { if (hbInterval.isPresent()) { conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, hbInterval.get(), TimeUnit.MILLISECONDS); + conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, + hbInterval.get(), TimeUnit.MILLISECONDS); } else { conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, DEFAULT_HB_INTERVAL_MS, TimeUnit.MILLISECONDS); + conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, + DEFAULT_HB_INTERVAL_MS, TimeUnit.MILLISECONDS); } if (hbProcessorInterval.isPresent()) { --------------------------------------------------------------------- To unsubscribe, e-mail: hdfs-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: hdfs-commits-h...@hadoop.apache.org