This is an automated email from the ASF dual-hosted git repository. elek pushed a commit to branch HDDS-2034 in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
commit 11aa210b7dd8277ac022a530d1925cfd47be1e97 Author: Sammi Chen <sammic...@apache.org> AuthorDate: Wed Oct 9 18:15:17 2019 +0800 Update per comments --- hadoop-hdds/container-service/pom.xml | 5 - .../common/report/PipelineReportPublisher.java | 1 + .../common/statemachine/StateContext.java | 10 -- .../ClosePipelineCommandHandler.java | 63 ++---------- .../CreatePipelineCommandHandler.java | 111 ++------------------- .../common/transport/server/XceiverServerSpi.java | 18 ++++ .../transport/server/ratis/XceiverServerRatis.java | 36 +++++++ .../commands/CreatePipelineCommandStatus.java | 97 ------------------ .../proto/StorageContainerDatanodeProtocol.proto | 8 -- .../scm/command/CommandStatusReportHandler.java | 6 -- .../apache/hadoop/hdds/scm/events/SCMEvents.java | 10 +- .../hadoop/hdds/scm/pipeline/PipelineManager.java | 5 +- .../hdds/scm/pipeline/PipelineReportHandler.java | 7 +- .../hdds/scm/pipeline/SCMPipelineManager.java | 85 +++------------- .../hdds/scm/pipeline/SCMPipelineMetrics.java | 1 - .../scm/safemode/HealthyPipelineSafeModeRule.java | 1 - .../safemode/OneReplicaPipelineSafeModeRule.java | 48 +++++---- .../hdds/scm/server/StorageContainerManager.java | 3 +- .../java/org/apache/hadoop/hdds/scm/TestUtils.java | 21 ---- .../container/TestCloseContainerEventHandler.java | 2 +- .../scm/container/TestSCMContainerManager.java | 2 +- .../hdds/scm/node/TestContainerPlacement.java | 2 +- .../safemode/TestHealthyPipelineSafeModeRule.java | 6 +- .../TestOneReplicaPipelineSafeModeRule.java | 2 +- .../hdds/scm/safemode/TestSCMSafeModeManager.java | 10 +- .../hdds/scm/pipeline/TestSCMPipelineManager.java | 45 +++++---- .../apache/hadoop/ozone/MiniOzoneClusterImpl.java | 6 -- 27 files changed, 163 insertions(+), 448 deletions(-) diff --git a/hadoop-hdds/container-service/pom.xml b/hadoop-hdds/container-service/pom.xml index bb85f3c..2f89fa2 100644 --- a/hadoop-hdds/container-service/pom.xml +++ b/hadoop-hdds/container-service/pom.xml @@ -38,11 +38,6 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> <artifactId>hadoop-hdds-server-framework</artifactId> </dependency> <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdds-client</artifactId> - <version>${hdds.version}</version> - </dependency> - <dependency> <groupId>io.dropwizard.metrics</groupId> <artifactId>metrics-core</artifactId> </dependency> diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java index e7f4347..e1dd098 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java @@ -68,6 +68,7 @@ public class PipelineReportPublisher extends @Override protected PipelineReportsProto getReport() { + System.out.println("Pipeline Report Generate"); return getContext().getParent().getContainer().getPipelineReport(); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index 17e5502..2c01f3a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -37,8 +37,6 @@ import org.apache.hadoop.ozone.container.common.states.datanode import org.apache.hadoop.ozone.protocol.commands.CommandStatus; import org.apache.hadoop.ozone.protocol.commands .DeleteBlockCommandStatus.DeleteBlockCommandStatusBuilder; -import org.apache.hadoop.ozone.protocol.commands - .CreatePipelineCommandStatus.CreatePipelineCommandStatusBuilder; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import static java.lang.Math.min; @@ -466,14 +464,6 @@ public class StateContext { .setType(cmd.getType()) .build()); } - if (cmd.getType() == SCMCommandProto.Type.createPipelineCommand) { - addCmdStatus(cmd.getId(), - CreatePipelineCommandStatusBuilder.newBuilder() - .setCmdId(cmd.getId()) - .setStatus(Status.PENDING) - .setType(cmd.getType()) - .build()); - } } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java index bd48839..a31387a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java @@ -16,33 +16,21 @@ */ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto. StorageContainerDatanodeProtocolProtos.ClosePipelineCommandProto; import org.apache.hadoop.hdds.protocol.proto. StorageContainerDatanodeProtocolProtos.SCMCommandProto; -import org.apache.hadoop.hdds.ratis.RatisHelper; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdds.scm.client.HddsClientUtils; -import org.apache.hadoop.hdds.scm.pipeline.PipelineID; -import org.apache.hadoop.hdds.security.x509.SecurityConfig; -import org.apache.hadoop.hdds.security.x509.certificate.client - .CertificateClient; import org.apache.hadoop.ozone.container.common.statemachine .SCMConnectionManager; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.common.transport.server + .XceiverServerSpi; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.util.Time; -import org.apache.ratis.client.RaftClient; -import org.apache.ratis.grpc.GrpcTlsConfig; -import org.apache.ratis.protocol.RaftGroupId; -import org.apache.ratis.protocol.RaftPeer; -import org.apache.ratis.retry.RetryPolicy; -import org.apache.ratis.rpc.SupportedRpcType; -import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,24 +67,23 @@ public class ClosePipelineCommandHandler implements CommandHandler { StateContext context, SCMConnectionManager connectionManager) { invocationCount.incrementAndGet(); final long startTime = Time.monotonicNow(); - final DatanodeDetails datanode = context.getParent() - .getDatanodeDetails(); + final DatanodeDetails dn = context.getParent().getDatanodeDetails(); final ClosePipelineCommandProto closeCommand = ((ClosePipelineCommand)command).getProto(); - final PipelineID pipelineID = PipelineID.getFromProtobuf( - closeCommand.getPipelineID()); + final HddsProtos.PipelineID pipelineID = closeCommand.getPipelineID(); try { - destroyPipeline(datanode, pipelineID, context); + XceiverServerSpi server = ozoneContainer.getWriteChannel(); + server.removeGroup(pipelineID); + context.getParent().triggerHeartbeat(); LOG.info("Close Pipeline #{} command on datanode #{}.", pipelineID, - datanode.getUuidString()); + dn.getUuidString()); } catch (IOException e) { LOG.error("Can't close pipeline #{}", pipelineID, e); } finally { long endTime = Time.monotonicNow(); totalTime += endTime - startTime; } - } /** @@ -131,36 +118,4 @@ public class ClosePipelineCommandHandler implements CommandHandler { } return 0; } - - /** - * Destroy pipeline on this datanode. - * - * @param dn - Datanode on which pipeline needs to be destroyed - * @param pipelineID - ID of pipeline to be destroyed - * @param context - Ozone datanode context - * @throws IOException - */ - void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID, - StateContext context) throws IOException { - final Configuration ozoneConf = context.getParent().getConf(); - final String rpcType = ozoneConf - .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, - ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); - final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf); - final RaftPeer p = RatisHelper.toRaftPeer(dn); - final int maxOutstandingRequests = - HddsClientUtils.getMaxOutstandingRequests(ozoneConf); - final CertificateClient dnCertClient = - context.getParent().getCertificateClient(); - final GrpcTlsConfig tlsConfig = RatisHelper.createTlsServerConfigForDN( - new SecurityConfig(ozoneConf), dnCertClient); - final TimeDuration requestTimeout = - RatisHelper.getClientRequestTimeout(ozoneConf); - try(RaftClient client = RatisHelper - .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p, - retryPolicy, maxOutstandingRequests, tlsConfig, requestTimeout)) { - client.groupRemove(RaftGroupId.valueOf(pipelineID.getId()), - true, p.getId()); - } - } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java index e6d20e7..3a60d7e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java @@ -16,52 +16,28 @@ */ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto. StorageContainerDatanodeProtocolProtos.CreatePipelineCommandProto; import org.apache.hadoop.hdds.protocol.proto. StorageContainerDatanodeProtocolProtos.SCMCommandProto; -import org.apache.hadoop.hdds.protocol.proto. - StorageContainerDatanodeProtocolProtos.CreatePipelineACKProto; -import org.apache.hadoop.hdds.ratis.RatisHelper; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdds.scm.client.HddsClientUtils; -import org.apache.hadoop.hdds.scm.pipeline.PipelineID; -import org.apache.hadoop.hdds.security.x509.SecurityConfig; -import org.apache.hadoop.hdds.security.x509.certificate.client - .CertificateClient; -import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.ozone.container.common.statemachine .SCMConnectionManager; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.common.transport.server + .XceiverServerSpi; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; -import org.apache.hadoop.ozone.protocol.commands.CommandStatus; import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand; -import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommandStatus; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.util.Time; -import org.apache.ratis.client.RaftClient; -import org.apache.ratis.grpc.GrpcTlsConfig; import org.apache.ratis.protocol.NotLeaderException; -import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.RaftGroup; -import org.apache.ratis.protocol.RaftGroupId; -import org.apache.ratis.protocol.RaftPeer; -import org.apache.ratis.retry.RetryPolicy; -import org.apache.ratis.rpc.SupportedRpcType; -import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; import java.util.stream.Collectors; /** @@ -98,37 +74,27 @@ public class CreatePipelineCommandHandler implements CommandHandler { .getDatanodeDetails(); final CreatePipelineCommandProto createCommand = ((CreatePipelineCommand)command).getProto(); - final PipelineID pipelineID = PipelineID.getFromProtobuf( - createCommand.getPipelineID()); + final HddsProtos.PipelineID pipelineID = createCommand.getPipelineID(); Collection<DatanodeDetails> peers = createCommand.getDatanodeList().stream() .map(DatanodeDetails::getFromProtoBuf) .collect(Collectors.toList()); - final CreatePipelineACKProto createPipelineACK = - CreatePipelineACKProto.newBuilder() - .setPipelineID(createCommand.getPipelineID()) - .setDatanodeUUID(dn.getUuidString()).build(); - boolean success = false; try { - createPipeline(dn, pipelineID, peers, context); - success = true; + XceiverServerSpi server = ozoneContainer.getWriteChannel(); + server.addGroup(pipelineID, peers); LOG.info("Create Pipeline {} {} #{} command succeed on datanode {}.", createCommand.getType(), createCommand.getFactor(), pipelineID, dn.getUuidString()); + // Trigger heartbeat report + context.addReport(context.getParent().getContainer().getPipelineReport()); + context.getParent().triggerHeartbeat(); } catch (NotLeaderException e) { LOG.debug("Follower cannot create pipeline #{}.", pipelineID); } catch (IOException e) { LOG.error("Can't create pipeline {} {} #{}", createCommand.getType(), createCommand.getFactor(), pipelineID, e); } finally { - final boolean cmdExecuted = success; - Consumer<CommandStatus> statusUpdater = (cmdStatus) -> { - cmdStatus.setStatus(cmdExecuted); - ((CreatePipelineCommandStatus)cmdStatus) - .setCreatePipelineAck(createPipelineACK); - }; - updateCommandStatus(context, command, statusUpdater, LOG); long endTime = Time.monotonicNow(); totalTime += endTime - startTime; } @@ -166,63 +132,4 @@ public class CreatePipelineCommandHandler implements CommandHandler { } return 0; } - - /** - * Sends ratis command to create pipeline on this datanode. - * - * @param dn - this datanode - * @param pipelineId - pipeline ID - * @param peers - datanodes of the pipeline - * @param context - Ozone datanode context - * @throws IOException - */ - private void createPipeline(DatanodeDetails dn, PipelineID pipelineId, - Collection<DatanodeDetails> peers, StateContext context) - throws IOException { - final Configuration ozoneConf = context.getParent().getConf(); - final RaftGroup group = RatisHelper.newRaftGroup( - RaftGroupId.valueOf(pipelineId.getId()), peers); - LOG.debug("creating pipeline:#{} with {}", pipelineId, group); - - final String rpcType = ozoneConf - .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, - ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); - final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf); - final List< IOException > exceptions = - Collections.synchronizedList(new ArrayList<>()); - final int maxOutstandingRequests = - HddsClientUtils.getMaxOutstandingRequests(ozoneConf); - final CertificateClient dnCertClient = - context.getParent().getCertificateClient(); - final GrpcTlsConfig tlsConfig = RatisHelper.createTlsServerConfigForDN( - new SecurityConfig(ozoneConf), dnCertClient); - final TimeDuration requestTimeout = - RatisHelper.getClientRequestTimeout(ozoneConf); - try { - final RaftPeer peer = RatisHelper.toRaftPeer(dn); - try (RaftClient client = RatisHelper - .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), peer, - retryPolicy, maxOutstandingRequests, tlsConfig, - requestTimeout)) { - - RaftClientReply reply = client.groupAdd(group, peer.getId()); - if (reply == null || !reply.isSuccess()) { - String msg = "Pipeline initialization failed for pipeline:" - + pipelineId.getId() + " node:" + peer.getId(); - throw new IOException(msg); - } - } catch (IOException ioe) { - String errMsg = - "Failed invoke Ratis rpc for " + dn.getUuid(); - exceptions.add(new IOException(errMsg, ioe)); - } - } catch (RejectedExecutionException ex) { - throw new IOException(ex.getClass().getName() + " exception occurred " + - "during createPipeline", ex); - } - - if (!exceptions.isEmpty()) { - throw MultipleIOException.createIOException(exceptions); - } - } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java index 4e0d343..01f463c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.container.common.transport.server; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -25,6 +26,7 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReport; import java.io.IOException; +import java.util.Collection; import java.util.List; /** A server endpoint that acts as the communication layer for Ozone @@ -60,6 +62,22 @@ public interface XceiverServerSpi { */ boolean isExist(HddsProtos.PipelineID pipelineId); + + /** + * Join a new pipeline. + */ + default void addGroup(HddsProtos.PipelineID pipelineId, + Collection<DatanodeDetails> peers) throws IOException { + } + + + /** + * Exit a pipeline. + */ + default void removeGroup(HddsProtos.PipelineID pipelineId) + throws IOException { + } + /** * Get pipeline report for the XceiverServer instance. * @return list of report for each pipeline. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 746bfb8..ba9a5fd 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -64,6 +64,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Objects; @@ -610,6 +611,41 @@ public final class XceiverServerRatis extends XceiverServer { return pipelineIDs; } + @Override + public void addGroup(HddsProtos.PipelineID pipelineId, + Collection<DatanodeDetails> peers) throws IOException { + final PipelineID pipelineID = PipelineID.getFromProtobuf(pipelineId); + final RaftGroupId groupId = RaftGroupId.valueOf(pipelineID.getId()); + final RaftGroup group = RatisHelper.newRaftGroup(groupId, peers); + GroupManagementRequest request = GroupManagementRequest.newAdd( + clientId, server.getId(), nextCallId(), group); + + RaftClientReply reply; + try { + reply = server.groupManagement(request); + } catch (Exception e) { + throw new IOException(e.getMessage(), e); + } + processReply(reply); + } + + @Override + public void removeGroup(HddsProtos.PipelineID pipelineId) + throws IOException { + GroupManagementRequest request = GroupManagementRequest.newRemove( + clientId, server.getId(), nextCallId(), + RaftGroupId.valueOf(PipelineID.getFromProtobuf(pipelineId).getId()), + true); + + RaftClientReply reply; + try { + reply = server.groupManagement(request); + } catch (Exception e) { + throw new IOException(e.getMessage(), e); + } + processReply(reply); + } + void handleNodeSlowness(RaftGroupId groupId, RoleInfoProto roleInfoProto) { handlePipelineFailure(groupId, roleInfoProto); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommandStatus.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommandStatus.java deleted file mode 100644 index b5e7d6c..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommandStatus.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.protocol.commands; - -import org.apache.hadoop.hdds.protocol.proto. - StorageContainerDatanodeProtocolProtos; -import org.apache.hadoop.hdds.protocol.proto. - StorageContainerDatanodeProtocolProtos.CreatePipelineACKProto; -import org.apache.hadoop.hdds.protocol.proto. - StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; -import org.apache.hadoop.hdds.protocol.proto. - StorageContainerDatanodeProtocolProtos.CommandStatus.Status; -/** - * Command status to report about pipeline creation. - */ -public class CreatePipelineCommandStatus extends CommandStatus { - - private CreatePipelineACKProto createPipelineAck; - - public CreatePipelineCommandStatus(Type type, Long cmdId, Status status, - String msg, CreatePipelineACKProto ack) { - super(type, cmdId, status, msg); - this.createPipelineAck = ack; - } - - public void setCreatePipelineAck(CreatePipelineACKProto ack) { - createPipelineAck = ack; - } - - @Override - public CommandStatus getFromProtoBuf( - StorageContainerDatanodeProtocolProtos.CommandStatus cmdStatusProto) { - return CreatePipelineCommandStatusBuilder.newBuilder() - .setCreatePipelineAck(cmdStatusProto.getCreatePipelineAck()) - .setCmdId(cmdStatusProto.getCmdId()) - .setStatus(cmdStatusProto.getStatus()) - .setType(cmdStatusProto.getType()) - .setMsg(cmdStatusProto.getMsg()) - .build(); - } - - @Override - public StorageContainerDatanodeProtocolProtos.CommandStatus - getProtoBufMessage() { - StorageContainerDatanodeProtocolProtos.CommandStatus.Builder builder = - StorageContainerDatanodeProtocolProtos.CommandStatus.newBuilder() - .setCmdId(this.getCmdId()) - .setStatus(this.getStatus()) - .setType(this.getType()); - if (createPipelineAck != null) { - builder.setCreatePipelineAck(createPipelineAck); - } - if (this.getMsg() != null) { - builder.setMsg(this.getMsg()); - } - return builder.build(); - } - - /** - * Builder for CreatePipelineCommandStatus. - */ - public static final class CreatePipelineCommandStatusBuilder - extends CommandStatusBuilder { - private CreatePipelineACKProto createPipelineAck = null; - - public static CreatePipelineCommandStatusBuilder newBuilder() { - return new CreatePipelineCommandStatusBuilder(); - } - - public CreatePipelineCommandStatusBuilder setCreatePipelineAck( - CreatePipelineACKProto ack) { - this.createPipelineAck = ack; - return this; - } - - @Override - public CommandStatus build() { - return new CreatePipelineCommandStatus(getType(), getCmdId(), getStatus(), - getMsg(), createPipelineAck); - } - } -} diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index 90124c6..69d22f6 100644 --- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -177,7 +177,6 @@ message CommandStatus { required SCMCommandProto.Type type = 3; optional string msg = 4; optional ContainerBlocksDeletionACKProto blockDeletionAck = 5; - optional CreatePipelineACKProto createPipelineAck = 6; } message ContainerActionsProto { @@ -335,13 +334,6 @@ message CreatePipelineCommandProto { required int64 cmdId = 5; } -// ACK message datanode sent to SCM, contains the result of -// pipeline creation status. -message CreatePipelineACKProto { - required PipelineID pipelineID = 1; - required string datanodeUUID = 2; -} - /** This command asks the datanode to close a pipeline. */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java index aad5573..51b8ea5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java @@ -60,12 +60,6 @@ public class CommandStatusReportHandler implements new DeleteBlockStatus(cmdStatus)); } break; - case createPipelineCommand: - if (cmdStatus.getStatus() != CommandStatus.Status.PENDING) { - publisher.fireEvent(SCMEvents.CREATE_PIPELINE_STATUS, - new CreatePipelineStatus(cmdStatus)); - } - break; default: LOGGER.warn("CommandStatus of type:{} not handled in " + "CommandStatusReportHandler.", cmdStatus.getType()); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java index dd476a7..02e922e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java @@ -100,7 +100,7 @@ public final class SCMEvents { new TypedEvent<>(PipelineReportFromDatanode.class, "Pipeline_Report"); /** - * Open pipeline event sent by ScmPipelineManager. This event is + * Open pipeline event sent by PipelineReportHandler. This event is * received by HealthyPipelineSafeModeRule. */ public static final TypedEvent<Pipeline> @@ -199,14 +199,6 @@ public final class SCMEvents { new TypedEvent<>(SafeModeStatus.class); /** - * This event is triggered by CommandStatusReportHandler whenever a - * status for CreatePipeline SCMCommand is received. - */ - public static final TypedEvent<CreatePipelineStatus> - CREATE_PIPELINE_STATUS = - new TypedEvent<>(CreatePipelineStatus.class, "Create_Pipeline_Status"); - - /** * Private Ctor. Never Constructed. */ private SCMEvents() { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java index e477860..a74cab5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java @@ -21,9 +21,7 @@ package org.apache.hadoop.hdds.scm.pipeline; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler; import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.server.events.EventHandler; import java.io.Closeable; import java.io.IOException; @@ -34,8 +32,7 @@ import java.util.NavigableSet; /** * Interface which exposes the api for pipeline management. */ -public interface PipelineManager extends Closeable, PipelineManagerMXBean, - EventHandler<CommandStatusReportHandler.CreatePipelineStatus> { +public interface PipelineManager extends Closeable, PipelineManagerMXBean { Pipeline createPipeline(ReplicationType type, ReplicationFactor factor) throws IOException; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java index 6b9a839..e7af043 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java @@ -104,10 +104,9 @@ public class PipelineReportHandler implements if (pipeline.isHealthy()) { // if all the dns have reported, pipeline can be moved to OPEN state pipelineManager.openPipeline(pipelineID); - } else { - if (pipelineAvailabilityCheck && scmSafeModeManager.getInSafeMode()) { - publisher.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline); - } + } + if (pipelineAvailabilityCheck && scmSafeModeManager.getInSafeMode()) { + publisher.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline); } } else { // In OPEN state case just report the datanode diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index 405baa8..f19ae21 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -27,18 +27,11 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.CreatePipelineACKProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.CommandStatus; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.node.NodeManager; -import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.server.ServerUtils; import org.apache.hadoop.hdds.server.events.EventPublisher; -import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler - .CreatePipelineStatus; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.hdds.utils.MetadataKeyFilters; @@ -52,6 +45,7 @@ import javax.management.ObjectName; import java.io.File; import java.io.IOException; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NavigableSet; @@ -88,14 +82,14 @@ public class SCMPipelineManager implements PipelineManager { private final NodeManager nodeManager; private final SCMPipelineMetrics metrics; private final Configuration conf; - private final StorageContainerManager scm; private boolean pipelineAvailabilityCheck; private boolean createPipelineInSafemode; + private Set<PipelineID> oldRatisThreeFactorPipelineIDSet = new HashSet<>(); // Pipeline Manager MXBean private ObjectName pmInfoBean; public SCMPipelineManager(Configuration conf, NodeManager nodeManager, - EventPublisher eventPublisher, StorageContainerManager scm) + EventPublisher eventPublisher) throws IOException { this.lock = new ReentrantReadWriteLock(); this.conf = conf; @@ -122,7 +116,6 @@ public class SCMPipelineManager implements PipelineManager { this.metrics = SCMPipelineMetrics.create(); this.pmInfoBean = MBeans.register("SCMPipelineManager", "SCMPipelineManagerInfo", this); - this.scm = scm; this.pipelineAvailabilityCheck = conf.getBoolean( HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT); @@ -142,6 +135,10 @@ public class SCMPipelineManager implements PipelineManager { pipelineFactory.setProvider(replicationType, provider); } + public Set<PipelineID> getOldPipelineIdSet() { + return oldRatisThreeFactorPipelineIDSet; + } + private void initializePipelineState() throws IOException { if (pipelineStore.isEmpty()) { LOG.info("No pipeline exists in current db"); @@ -162,6 +159,10 @@ public class SCMPipelineManager implements PipelineManager { Preconditions.checkNotNull(pipeline); stateManager.addPipeline(pipeline); nodeManager.addPipeline(pipeline); + if (pipeline.getType() == ReplicationType.RATIS && + pipeline.getFactor() == ReplicationFactor.THREE) { + oldRatisThreeFactorPipelineIDSet.add(pipeline.getId()); + } } } @@ -324,9 +325,8 @@ public class SCMPipelineManager implements PipelineManager { lock.writeLock().lock(); try { Pipeline pipeline = stateManager.openPipeline(pipelineId); - if (pipelineAvailabilityCheck && scm != null && scm.isInSafeMode()) { - eventPublisher.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline); - } + metrics.incNumPipelineCreated(); + metrics.createPerPipelineMetrics(pipeline); } finally { lock.writeLock().unlock(); } @@ -494,63 +494,4 @@ public class SCMPipelineManager implements PipelineManager { // shutdown pipeline provider. pipelineFactory.shutdown(); } - - @Override - public void onMessage(CreatePipelineStatus response, - EventPublisher publisher) { - CommandStatus result = response.getCmdStatus(); - CreatePipelineACKProto ack = result.getCreatePipelineAck(); - PipelineID pipelineID = PipelineID.getFromProtobuf(ack.getPipelineID()); - CommandStatus.Status status = result.getStatus(); - LOG.info("receive pipeline {} create status {}", pipelineID, status); - Pipeline pipeline; - try { - pipeline = stateManager.getPipeline(pipelineID); - } catch (PipelineNotFoundException e) { - LOG.warn("Pipeline {} cannot be found", pipelineID); - return; - } - - switch (status) { - case EXECUTED: - DatanodeDetails dn = nodeManager.getNodeByUuid(ack.getDatanodeUUID()); - if (dn == null) { - LOG.warn("Datanode {} for Pipeline {} cannot be found", - ack.getDatanodeUUID(), pipelineID); - return; - } - try { - pipeline.reportDatanode(dn); - } catch (IOException e) { - LOG.warn("Update {} for Pipeline {} failed for {}", - dn.getUuidString(), pipelineID, e.getMessage()); - return; - } - // If all datanodes are updated, we believe pipeline is ready to OPEN - if (pipeline.isHealthy() || - pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) { - try { - openPipeline(pipelineID); - } catch (IOException e) { - LOG.warn("Fail to open Pipeline {} for {}", pipelineID, - e.getMessage()); - return; - } - metrics.incNumPipelineCreated(); - metrics.createPerPipelineMetrics(pipeline); - } - return; - case FAILED: - metrics.incNumPipelineCreationFailed(); - try { - finalizeAndDestroyPipeline(pipeline, false); - } catch (IOException e) { - LOG.warn("Fail to close Pipeline {} for {}", pipelineID, - e.getMessage()); - } - return; - default: - LOG.error("Unknown or unexpected status {}", status); - } - } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java index fa91572..cd80010 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java @@ -96,7 +96,6 @@ public final class SCMPipelineMetrics implements MetricsSource { } void createPerPipelineMetrics(Pipeline pipeline) { - System.out.println("add pipeline " + pipeline.getId() + " to metrics map"); numBlocksAllocated.put(pipeline.getId(), new MutableCounterLong(Interns .info(getBlockAllocationMetricName(pipeline), "Number of blocks allocated in pipeline " + pipeline.getId()), 0L)); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java index 5304270..7ee8670 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java @@ -101,7 +101,6 @@ public class HealthyPipelineSafeModeRule @Override protected boolean validate() { if (currentHealthyPipelineCount >= healthyPipelineThresholdCount) { - LOG.info("{} rule satisfied", this.getClass().getSimpleName()); return true; } return false; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java index 34ba35c..2138bf2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.hdds.server.events.TypedEvent; import org.slf4j.Logger; @@ -47,7 +48,11 @@ public class OneReplicaPipelineSafeModeRule extends private int thresholdCount; private Set<PipelineID> reportedPipelineIDSet = new HashSet<>(); - private int currentReportedPipelineCount = 0; + private Set<PipelineID> oldPipelineIDSet; + private int oldPipelineReportedCount = 0; + private int oldPipelineThresholdCount = 0; + private int newPipelineThresholdCount = 0; + private int newPipelineReportedCount = 0; public OneReplicaPipelineSafeModeRule(String ruleName, EventQueue eventQueue, @@ -66,16 +71,19 @@ public class OneReplicaPipelineSafeModeRule extends HDDS_SCM_SAFEMODE_ONE_NODE_REPORTED_PIPELINE_PCT + " value should be >= 0.0 and <= 1.0"); - // Exclude CLOSED pipeline + oldPipelineIDSet = + ((SCMPipelineManager)pipelineManager).getOldPipelineIdSet(); int totalPipelineCount = pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS, - HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN) - .size() + - pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS, - HddsProtos.ReplicationFactor.THREE, - Pipeline.PipelineState.ALLOCATED).size(); + HddsProtos.ReplicationFactor.THREE).size(); + Preconditions.checkState(totalPipelineCount >= oldPipelineIDSet.size()); - thresholdCount = (int) Math.ceil(percent * totalPipelineCount); + oldPipelineThresholdCount = + (int) Math.ceil(percent * oldPipelineIDSet.size()); + newPipelineThresholdCount = (int) Math.ceil( + percent * (totalPipelineCount - oldPipelineIDSet.size())); + + thresholdCount = oldPipelineThresholdCount + newPipelineThresholdCount; LOG.info("Total pipeline count is {}, pipeline's with at least one " + "datanode reported threshold count is {}", totalPipelineCount, @@ -92,8 +100,7 @@ public class OneReplicaPipelineSafeModeRule extends @Override protected boolean validate() { - if (currentReportedPipelineCount >= thresholdCount) { - LOG.info("{} rule satisfied", this.getClass().getSimpleName()); + if (newPipelineReportedCount + oldPipelineReportedCount >= thresholdCount) { return true; } return false; @@ -105,19 +112,26 @@ public class OneReplicaPipelineSafeModeRule extends if (pipeline.getType() == HddsProtos.ReplicationType.RATIS && pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE && !reportedPipelineIDSet.contains(pipeline.getId())) { - reportedPipelineIDSet.add(pipeline.getId()); - getSafeModeMetrics() - .incCurrentHealthyPipelinesWithAtleastOneReplicaReportedCount(); + if (oldPipelineIDSet.contains(pipeline.getId()) && + oldPipelineReportedCount < oldPipelineThresholdCount) { + oldPipelineReportedCount++; + reportedPipelineIDSet.add(pipeline.getId()); + getSafeModeMetrics() + .incCurrentHealthyPipelinesWithAtleastOneReplicaReportedCount(); + } else if (newPipelineReportedCount < newPipelineThresholdCount) { + newPipelineReportedCount++; + reportedPipelineIDSet.add(pipeline.getId()); + getSafeModeMetrics() + .incCurrentHealthyPipelinesWithAtleastOneReplicaReportedCount(); + } } - currentReportedPipelineCount = reportedPipelineIDSet.size(); - if (scmInSafeMode()) { SCMSafeModeManager.getLogger().info( "SCM in safe mode. Pipelines with at least one datanode reported " + "count is {}, required at least one datanode reported per " + "pipeline count is {}", - currentReportedPipelineCount, thresholdCount); + newPipelineReportedCount + oldPipelineReportedCount, thresholdCount); } } @@ -133,6 +147,6 @@ public class OneReplicaPipelineSafeModeRule extends @VisibleForTesting public int getCurrentReportedPipelineCount() { - return currentReportedPipelineCount; + return newPipelineReportedCount + oldPipelineReportedCount; } } \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 7c2cffa..2b4db3f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -356,7 +356,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionHandler); eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler); eventQueue.addHandler(SCMEvents.SAFE_MODE_STATUS, safeModeHandler); - eventQueue.addHandler(SCMEvents.CREATE_PIPELINE_STATUS, pipelineManager); registerMXBean(); registerMetricsSource(this); } @@ -403,7 +402,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl pipelineManager = configurator.getPipelineManager(); } else { pipelineManager = - new SCMPipelineManager(conf, scmNodeManager, eventQueue, this); + new SCMPipelineManager(conf, scmNodeManager, eventQueue); } if (configurator.getContainerManager() != null) { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java index 46e8a51..8d71fe5 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java @@ -43,12 +43,6 @@ import org.apache.hadoop.hdds.scm.server .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerManager; -import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler - .CreatePipelineStatus; -import org.apache.hadoop.hdds.protocol.proto. - StorageContainerDatanodeProtocolProtos.CreatePipelineACKProto; -import org.apache.hadoop.hdds.protocol.proto. - StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto @@ -368,21 +362,6 @@ public final class TestUtils { return new PipelineReportFromDatanode(dn, reportBuilder.build()); } - public static CreatePipelineStatus getPipelineCreateStatusFromDatanode( - DatanodeDetails dn, PipelineID pipelineID) { - CreatePipelineACKProto ack = CreatePipelineACKProto.newBuilder() - .setPipelineID(pipelineID.getProtobuf()) - .setDatanodeUUID(dn.getUuidString()) - .build(); - CommandStatus status = CommandStatus.newBuilder() - .setCreatePipelineAck(ack) - .setStatus(CommandStatus.Status.EXECUTED) - .setCmdId(0L) - .setType(Type.createPipelineCommand) - .build(); - return new CreatePipelineStatus(status); - } - public static void openAllRatisPipelines(PipelineManager pipelineManager) throws IOException { // Pipeline is created by background thread diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java index ba3d4f4..b14c97d 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java @@ -68,7 +68,7 @@ public class TestCloseContainerEventHandler { .set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath()); nodeManager = new MockNodeManager(true, 10); pipelineManager = - new SCMPipelineManager(configuration, nodeManager, eventQueue, null); + new SCMPipelineManager(configuration, nodeManager, eventQueue); PipelineProvider mockRatisProvider = new MockRatisPipelineProvider(nodeManager, pipelineManager.getStateManager(), configuration); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java index 75a1ad3..bfdeac5 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java @@ -94,7 +94,7 @@ public class TestSCMContainerManager { } nodeManager = new MockNodeManager(true, 10); pipelineManager = - new SCMPipelineManager(conf, nodeManager, new EventQueue(), null); + new SCMPipelineManager(conf, nodeManager, new EventQueue()); containerManager = new SCMContainerManager(conf, nodeManager, pipelineManager, new EventQueue()); xceiverClientManager = new XceiverClientManager(conf); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java index 2206e4d..b302f80 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java @@ -106,7 +106,7 @@ public class TestContainerPlacement { final int cacheSize = config.getInt(OZONE_SCM_DB_CACHE_SIZE_MB, OZONE_SCM_DB_CACHE_SIZE_DEFAULT); PipelineManager pipelineManager = - new SCMPipelineManager(config, scmNodeManager, eventQueue, null); + new SCMPipelineManager(config, scmNodeManager, eventQueue); return new SCMContainerManager(config, scmNodeManager, pipelineManager, eventQueue); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java index 639fc9a..3d9f05a 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java @@ -67,7 +67,7 @@ public class TestHealthyPipelineSafeModeRule { HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false); SCMPipelineManager pipelineManager = new SCMPipelineManager(config, - nodeManager, eventQueue, null); + nodeManager, eventQueue); PipelineProvider mockRatisProvider = new MockRatisPipelineProvider(nodeManager, pipelineManager.getStateManager(), config); @@ -111,7 +111,7 @@ public class TestHealthyPipelineSafeModeRule { HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false); SCMPipelineManager pipelineManager = new SCMPipelineManager(config, - nodeManager, eventQueue, null); + nodeManager, eventQueue); PipelineProvider mockRatisProvider = new MockRatisPipelineProvider(nodeManager, @@ -186,7 +186,7 @@ public class TestHealthyPipelineSafeModeRule { HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false); SCMPipelineManager pipelineManager = new SCMPipelineManager(config, - nodeManager, eventQueue, null); + nodeManager, eventQueue); PipelineProvider mockRatisProvider = new MockRatisPipelineProvider(nodeManager, pipelineManager.getStateManager(), config); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java index 7f8f0db..0fa5eae 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java @@ -68,7 +68,7 @@ public class TestOneReplicaPipelineSafeModeRule { eventQueue = new EventQueue(); pipelineManager = new SCMPipelineManager(ozoneConfiguration, mockNodeManager, - eventQueue, null); + eventQueue); PipelineProvider mockRatisProvider = new MockRatisPipelineProvider(mockNodeManager, diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java index 0de0a73..99d4c0a 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java @@ -194,7 +194,7 @@ public class TestSCMSafeModeManager { 0.9); MockNodeManager mockNodeManager = new MockNodeManager(true, 10); PipelineManager pipelineManager = new SCMPipelineManager(conf, - mockNodeManager, queue, null); + mockNodeManager, queue); scmSafeModeManager = new SCMSafeModeManager( conf, containers, pipelineManager, queue); fail("testFailWithIncorrectValueForHealthyPipelinePercent"); @@ -212,7 +212,7 @@ public class TestSCMSafeModeManager { 200); MockNodeManager mockNodeManager = new MockNodeManager(true, 10); PipelineManager pipelineManager = new SCMPipelineManager(conf, - mockNodeManager, queue, null); + mockNodeManager, queue); scmSafeModeManager = new SCMSafeModeManager( conf, containers, pipelineManager, queue); fail("testFailWithIncorrectValueForOneReplicaPipelinePercent"); @@ -229,7 +229,7 @@ public class TestSCMSafeModeManager { conf.setDouble(HddsConfigKeys.HDDS_SCM_SAFEMODE_THRESHOLD_PCT, -1.0); MockNodeManager mockNodeManager = new MockNodeManager(true, 10); PipelineManager pipelineManager = new SCMPipelineManager(conf, - mockNodeManager, queue, null); + mockNodeManager, queue); scmSafeModeManager = new SCMSafeModeManager( conf, containers, pipelineManager, queue); fail("testFailWithIncorrectValueForSafeModePercent"); @@ -253,7 +253,7 @@ public class TestSCMSafeModeManager { MockNodeManager mockNodeManager = new MockNodeManager(true, nodeCount); SCMPipelineManager pipelineManager = new SCMPipelineManager(conf, - mockNodeManager, queue, null); + mockNodeManager, queue); PipelineProvider mockRatisProvider = new MockRatisPipelineProvider(mockNodeManager, pipelineManager.getStateManager(), config); @@ -470,7 +470,7 @@ public class TestSCMSafeModeManager { HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true); SCMPipelineManager pipelineManager = new SCMPipelineManager(config, - nodeManager, queue, null); + nodeManager, queue); PipelineProvider mockRatisProvider = new MockRatisPipelineProvider(nodeManager, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java index 2e1fe9c..1e818f2 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java @@ -28,10 +28,12 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.TestUtils; -import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler - .CreatePipelineStatus; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.MockNodeManager; +import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher + .PipelineReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.test.GenericTestUtils; @@ -42,6 +44,7 @@ import org.junit.Test; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -77,7 +80,7 @@ public class TestSCMPipelineManager { @Test public void testPipelineReload() throws IOException { SCMPipelineManager pipelineManager = - new SCMPipelineManager(conf, nodeManager, new EventQueue(), null); + new SCMPipelineManager(conf, nodeManager, new EventQueue()); PipelineProvider mockRatisProvider = new MockRatisPipelineProvider(nodeManager, pipelineManager.getStateManager(), conf); @@ -94,7 +97,7 @@ public class TestSCMPipelineManager { // new pipeline manager should be able to load the pipelines from the db pipelineManager = - new SCMPipelineManager(conf, nodeManager, new EventQueue(), null); + new SCMPipelineManager(conf, nodeManager, new EventQueue()); mockRatisProvider = new MockRatisPipelineProvider(nodeManager, pipelineManager.getStateManager(), conf); @@ -117,7 +120,7 @@ public class TestSCMPipelineManager { @Test public void testRemovePipeline() throws IOException { SCMPipelineManager pipelineManager = - new SCMPipelineManager(conf, nodeManager, new EventQueue(), null); + new SCMPipelineManager(conf, nodeManager, new EventQueue()); PipelineProvider mockRatisProvider = new MockRatisPipelineProvider(nodeManager, pipelineManager.getStateManager(), conf); @@ -135,7 +138,7 @@ public class TestSCMPipelineManager { // new pipeline manager should not be able to load removed pipelines pipelineManager = - new SCMPipelineManager(conf, nodeManager, new EventQueue(), null); + new SCMPipelineManager(conf, nodeManager, new EventQueue()); try { pipelineManager.getPipeline(pipeline.getId()); Assert.fail("Pipeline should not have been retrieved"); @@ -151,13 +154,17 @@ public class TestSCMPipelineManager { public void testPipelineReport() throws IOException { EventQueue eventQueue = new EventQueue(); SCMPipelineManager pipelineManager = - new SCMPipelineManager(conf, nodeManager, eventQueue, null); + new SCMPipelineManager(conf, nodeManager, eventQueue); PipelineProvider mockRatisProvider = new MockRatisPipelineProvider(nodeManager, pipelineManager.getStateManager(), conf); pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS, mockRatisProvider); + SCMSafeModeManager scmSafeModeManager = + new SCMSafeModeManager(new OzoneConfiguration(), + new ArrayList<>(), pipelineManager, eventQueue); + // create a pipeline in allocated state with no dns yet reported Pipeline pipeline = pipelineManager .createPipeline(HddsProtos.ReplicationType.RATIS, @@ -168,14 +175,17 @@ public class TestSCMPipelineManager { Assert .assertFalse(pipelineManager.getPipeline(pipeline.getId()).isOpen()); - // get pipeline create status from each dn in the pipeline + // get pipeline report from each dn in the pipeline + PipelineReportHandler pipelineReportHandler = + new PipelineReportHandler(scmSafeModeManager, pipelineManager, conf); for (DatanodeDetails dn: pipeline.getNodes()) { + SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode pipelineReportFromDatanode = + TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId()); // pipeline is not healthy until all dns report Assert.assertFalse( pipelineManager.getPipeline(pipeline.getId()).isHealthy()); - CreatePipelineStatus response = - TestUtils.getPipelineCreateStatusFromDatanode(dn, pipeline.getId()); - pipelineManager.onMessage(response, eventQueue); + pipelineReportHandler + .onMessage(pipelineReportFromDatanode, new EventQueue()); } // pipeline is healthy when all dns report @@ -189,10 +199,11 @@ public class TestSCMPipelineManager { pipelineManager.finalizeAndDestroyPipeline(pipeline, false); for (DatanodeDetails dn: pipeline.getNodes()) { - CreatePipelineStatus response = - TestUtils.getPipelineCreateStatusFromDatanode(dn, pipeline.getId()); - // pipeline create status for destroyed pipeline should be ignored - pipelineManager.onMessage(response, new EventQueue()); + PipelineReportFromDatanode pipelineReportFromDatanode = + TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId()); + // pipeline report for destroyed pipeline should be ignored + pipelineReportHandler + .onMessage(pipelineReportFromDatanode, new EventQueue()); } try { @@ -211,7 +222,7 @@ public class TestSCMPipelineManager { MockNodeManager nodeManagerMock = new MockNodeManager(true, 20); SCMPipelineManager pipelineManager = - new SCMPipelineManager(conf, nodeManagerMock, new EventQueue(), null); + new SCMPipelineManager(conf, nodeManagerMock, new EventQueue()); PipelineProvider mockRatisProvider = new MockRatisPipelineProvider(nodeManagerMock, pipelineManager.getStateManager(), conf); @@ -266,7 +277,7 @@ public class TestSCMPipelineManager { @Test public void testActivateDeactivatePipeline() throws IOException { final SCMPipelineManager pipelineManager = - new SCMPipelineManager(conf, nodeManager, new EventQueue(), null); + new SCMPipelineManager(conf, nodeManager, new EventQueue()); final PipelineProvider mockRatisProvider = new MockRatisPipelineProvider(nodeManager, pipelineManager.getStateManager(), conf); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index 9845abb..b06a2fb 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -65,7 +65,6 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState .HEALTHY; @@ -621,15 +620,10 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster { if (hbInterval.isPresent()) { conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, hbInterval.get(), TimeUnit.MILLISECONDS); - conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, - hbInterval.get(), TimeUnit.MILLISECONDS); - } else { conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, DEFAULT_HB_INTERVAL_MS, TimeUnit.MILLISECONDS); - conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, - DEFAULT_HB_INTERVAL_MS, TimeUnit.MILLISECONDS); } if (hbProcessorInterval.isPresent()) { --------------------------------------------------------------------- To unsubscribe, e-mail: hdfs-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: hdfs-commits-h...@hadoop.apache.org