HDDS-476. Add Pipeline reports to make pipeline active on SCM restart. Contributed by Mukul Kumar Singh.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c0956ee2 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c0956ee2 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c0956ee2 Branch: refs/heads/YARN-7402 Commit: c0956ee2a879d1f82938dd2b8bab79b09ae32eac Parents: 0712537e Author: Nanda kumar <[email protected]> Authored: Wed Sep 19 18:49:13 2018 +0530 Committer: Nanda kumar <[email protected]> Committed: Wed Sep 19 18:52:08 2018 +0530 ---------------------------------------------------------------------- .../hadoop/hdds/scm/XceiverClientRatis.java | 2 +- .../org/apache/hadoop/hdds/HddsConfigKeys.java | 5 + .../scm/container/common/helpers/Pipeline.java | 22 ++- .../container/common/helpers/PipelineID.java | 13 +- .../common/src/main/resources/ozone-default.xml | 8 + .../apache/hadoop/hdds/scm/HddsServerUtil.java | 21 +++ .../common/report/PipelineReportPublisher.java | 73 +++++++++ .../common/report/ReportPublisherFactory.java | 4 + .../states/endpoint/RegisterEndpointTask.java | 8 +- .../transport/server/XceiverServerGrpc.java | 16 ++ .../transport/server/XceiverServerSpi.java | 9 ++ .../server/ratis/XceiverServerRatis.java | 63 ++++---- .../container/ozoneimpl/OzoneContainer.java | 12 ++ .../StorageContainerDatanodeProtocol.java | 10 +- .../protocol/StorageContainerNodeProtocol.java | 6 +- ...rDatanodeProtocolClientSideTranslatorPB.java | 6 +- ...rDatanodeProtocolServerSideTranslatorPB.java | 5 +- .../StorageContainerDatanodeProtocol.proto | 10 ++ .../ozone/container/common/ScmTestMock.java | 8 +- .../hdds/scm/container/ContainerMapping.java | 19 --- .../scm/container/ContainerReportHandler.java | 6 +- .../hadoop/hdds/scm/container/Mapping.java | 15 +- .../hadoop/hdds/scm/events/SCMEvents.java | 15 +- .../hadoop/hdds/scm/node/SCMNodeManager.java | 5 +- .../hadoop/hdds/scm/node/StaleNodeHandler.java | 19 +-- .../hdds/scm/node/states/Node2ContainerMap.java | 123 ++------------ .../hdds/scm/node/states/Node2ObjectsMap.java | 162 +++++++++++++++++++ .../hdds/scm/node/states/ReportResult.java | 105 ++++++------ .../hdds/scm/pipelines/Node2PipelineMap.java | 45 +----- .../scm/pipelines/PipelineCloseHandler.java | 24 ++- .../hdds/scm/pipelines/PipelineManager.java | 52 +++--- .../scm/pipelines/PipelineReportHandler.java | 59 +++++++ .../hdds/scm/pipelines/PipelineSelector.java | 103 ++++++------ .../scm/pipelines/ratis/RatisManagerImpl.java | 41 ++--- .../standalone/StandaloneManagerImpl.java | 44 ++--- .../server/SCMDatanodeHeartbeatDispatcher.java | 23 +++ .../scm/server/SCMDatanodeProtocolServer.java | 16 +- .../scm/server/StorageContainerManager.java | 11 +- .../org/apache/hadoop/hdds/scm/TestUtils.java | 10 +- .../hdds/scm/container/MockNodeManager.java | 4 +- .../hadoop/hdds/scm/node/TestNodeManager.java | 6 +- .../scm/node/states/TestNode2ContainerMap.java | 28 ++-- .../ozone/container/common/TestEndPoint.java | 5 +- .../testutils/ReplicationNodeManagerMock.java | 5 +- .../hdds/scm/pipeline/TestNode2PipelineMap.java | 2 +- .../hdds/scm/pipeline/TestPipelineClose.java | 2 +- .../hdds/scm/pipeline/TestSCMRestart.java | 20 ++- .../apache/hadoop/ozone/RatisTestHelper.java | 9 ++ .../hadoop/ozone/web/client/TestKeys.java | 2 +- .../hadoop/ozone/web/client/TestKeysRatis.java | 2 - hadoop-project/pom.xml | 2 +- 51 files changed, 809 insertions(+), 476 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index 946abfb..4c4de7f 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -110,7 +110,7 @@ public final class XceiverClientRatis extends XceiverClientSpi { final RaftGroup group = RatisHelper.newRaftGroup(pipeline); LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group); callRatisRpc(pipeline.getMachines(), (raftClient, peer) -> raftClient - .groupRemove(group.getGroupId(), peer.getId())); + .groupRemove(group.getGroupId(), true, peer.getId())); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java index 492be82..856d113 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java @@ -46,6 +46,11 @@ public final class HddsConfigKeys { public static final String HDDS_CONTAINER_REPORT_INTERVAL_DEFAULT = "60s"; + public static final String HDDS_PIPELINE_REPORT_INTERVAL = + "hdds.pipeline.report.interval"; + public static final String HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT = + "60s"; + public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL = "hdds.command.status.report.interval"; public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL_DEFAULT = http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java index ef148e5..777efa7 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java @@ -34,7 +34,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import java.io.IOException; import java.util.ArrayList; import java.util.Map; -import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.List; /** @@ -83,7 +83,7 @@ public class Pipeline { this.type = replicationType; this.factor = replicationFactor; this.id = id; - datanodes = new TreeMap<>(); + datanodes = new ConcurrentHashMap<>(); } @Override @@ -151,9 +151,21 @@ public class Pipeline { return getDatanodes().get(leaderID); } - public void addMember(DatanodeDetails datanodeDetails) { - datanodes.put(datanodeDetails.getUuid().toString(), - datanodeDetails); + /** + * Adds a datanode to pipeline + * @param datanodeDetails datanode to be added. + * @return true if the dn was not earlier present, false otherwise + */ + public boolean addMember(DatanodeDetails datanodeDetails) { + return datanodes.put(datanodeDetails.getUuid().toString(), + datanodeDetails) == null; + + } + + public void resetPipeline() { + // reset datanodes in pipeline and learn about them through + // pipeline reports on SCM restart + datanodes.clear(); } public Map<String, DatanodeDetails> getDatanodes() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineID.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineID.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineID.java index 473ebc5..6e27a71 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineID.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineID.java @@ -28,7 +28,7 @@ import java.util.UUID; * in Ratis as RaftGroupId, GroupID is used by the datanodes to initialize * the ratis group they are part of. */ -public class PipelineID { +public final class PipelineID implements Comparable<PipelineID> { private UUID id; private RaftGroupId groupId; @@ -42,8 +42,12 @@ public class PipelineID { return new PipelineID(UUID.randomUUID()); } + public static PipelineID valueOf(UUID id) { + return new PipelineID(id); + } + public static PipelineID valueOf(RaftGroupId groupId) { - return new PipelineID(groupId.getUuid()); + return valueOf(groupId.getUuid()); } public RaftGroupId getRaftGroupID() { @@ -68,6 +72,11 @@ public class PipelineID { } @Override + public int compareTo(PipelineID o) { + return this.id.compareTo(o.id); + } + + @Override public boolean equals(Object o) { if (this == o) { return true; http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/common/src/main/resources/ozone-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index a74124e..f7681e8 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -224,6 +224,14 @@ received from SCM to SCM. Unit could be defined with postfix (ns,ms,s,m,h,d)</description> </property> + <property> + <name>hdds.pipeline.report.interval</name> + <value>60000ms</value> + <tag>OZONE, PIPELINE, MANAGEMENT</tag> + <description>Time interval of the datanode to send pipeline report. Each + datanode periodically send pipeline report to SCM. Unit could be + defined with postfix (ns,ms,s,m,h,d)</description> + </property> <!--Ozone Settings--> <property> <name>ozone.administrators</name> http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java index 580d027..d505be3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java @@ -18,12 +18,15 @@ package org.apache.hadoop.hdds.scm; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.net.InetSocketAddress; import java.util.HashMap; import java.util.Map; @@ -312,4 +315,22 @@ public final class HddsServerUtil { services.put(OZONE_SCM_SERVICE_ID, serviceInstances); return services; } + + public static String getOzoneDatanodeRatisDirectory(Configuration conf) { + final String ratisDir = File.separator + "ratis"; + String storageDir = conf.get( + OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR); + + if (Strings.isNullOrEmpty(storageDir)) { + storageDir = conf.get(OzoneConfigKeys + .OZONE_METADATA_DIRS); + Preconditions.checkNotNull(storageDir, "ozone.metadata.dirs " + + "cannot be null, Please check your configs."); + storageDir = storageDir.concat(ratisDir); + LOG.warn("Storage directory for Ratis is not configured." + + "Mapping Ratis storage under {}. It is a good idea " + + "to map this to an SSD disk.", storageDir); + } + return storageDir; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java new file mode 100644 index 0000000..e7f4347 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.common.report; + +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.RandomUtils; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; +import org.apache.hadoop.hdds.scm.HddsServerUtil; + +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT; + + +/** + * Publishes Pipeline which will be sent to SCM as part of heartbeat. + * PipelineReport consist of the following information about each containers: + * - pipelineID + * + */ +public class PipelineReportPublisher extends + ReportPublisher<PipelineReportsProto> { + + private Long pipelineReportInterval = null; + + @Override + protected long getReportFrequency() { + if (pipelineReportInterval == null) { + pipelineReportInterval = getConf().getTimeDuration( + HDDS_PIPELINE_REPORT_INTERVAL, + HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + + long heartbeatFrequency = HddsServerUtil.getScmHeartbeatInterval( + getConf()); + + Preconditions.checkState( + heartbeatFrequency <= pipelineReportInterval, + HDDS_PIPELINE_REPORT_INTERVAL + + " cannot be configured lower than heartbeat frequency."); + } + // Add a random delay (0~30s) on top of the pipeline report + // interval (60s) so tha the SCM is overwhelmed by the pipeline reports + // sent in sync. + return pipelineReportInterval + getRandomReportDelay(); + } + + private long getRandomReportDelay() { + return RandomUtils.nextLong(0, pipelineReportInterval); + } + + @Override + protected PipelineReportsProto getReport() { + return getContext().getParent().getContainer().getPipelineReport(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java index ea89280..1c456a0 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java @@ -20,6 +20,8 @@ package org.apache.hadoop.ozone.container.common.report; import com.google.protobuf.GeneratedMessage; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.proto. + StorageContainerDatanodeProtocolProtos.PipelineReportsProto; +import org.apache.hadoop.hdds.protocol.proto. StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; @@ -53,6 +55,8 @@ public class ReportPublisherFactory { ContainerReportPublisher.class); report2publisher.put(CommandStatusReportsProto.class, CommandStatusReportPublisher.class); + report2publisher.put(PipelineReportsProto.class, + PipelineReportPublisher.class); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java index ccab095..690aa01 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java @@ -21,6 +21,8 @@ import com.google.common.base.Preconditions; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.ozone.container.common.statemachine .EndpointStateMachine; import org.apache.hadoop.hdds.protocol.proto @@ -108,13 +110,15 @@ public final class RegisterEndpointTask implements rpcEndPoint.lock(); try { - ContainerReportsProto contianerReport = datanodeContainerManager + ContainerReportsProto containerReport = datanodeContainerManager .getContainerReport(); NodeReportProto nodeReport = datanodeContainerManager.getNodeReport(); + PipelineReportsProto pipelineReportsProto = + datanodeContainerManager.getPipelineReport(); // TODO : Add responses to the command Queue. SCMRegisteredResponseProto response = rpcEndPoint.getEndPoint() .register(datanodeDetails.getProtoBufMessage(), nodeReport, - contianerReport); + containerReport, pipelineReportsProto); Preconditions.checkState(UUID.fromString(response.getDatanodeUUID()) .equals(datanodeDetails.getUuid()), "Unexpected datanode ID in the response."); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java index 4a90144..83e742c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java @@ -24,6 +24,9 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReport; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; @@ -38,6 +41,9 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.SocketAddress; +import java.util.Collections; +import java.util.List; +import java.util.UUID; /** * Creates a Grpc server endpoint that acts as the communication layer for @@ -47,6 +53,7 @@ public final class XceiverServerGrpc implements XceiverServerSpi { private static final Logger LOG = LoggerFactory.getLogger(XceiverServerGrpc.class); private int port; + private UUID id; private Server server; private final ContainerDispatcher storageContainer; @@ -59,6 +66,7 @@ public final class XceiverServerGrpc implements XceiverServerSpi { ContainerDispatcher dispatcher, BindableService... additionalServices) { Preconditions.checkNotNull(conf); + this.id = datanodeDetails.getUuid(); this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); // Get an available port on current node and @@ -123,4 +131,12 @@ public final class XceiverServerGrpc implements XceiverServerSpi { HddsProtos.PipelineID pipelineID) { storageContainer.dispatch(request); } + + @Override + public List<PipelineReport> getPipelineReport() { + return Collections.singletonList( + PipelineReport.newBuilder() + .setPipelineID(PipelineID.valueOf(id).getProtobuf()) + .build()); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java index 1863f6d..8c3fa5c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java @@ -21,8 +21,11 @@ package org.apache.hadoop.ozone.container.common.transport.server; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReport; import java.io.IOException; +import java.util.List; /** A server endpoint that acts as the communication layer for Ozone * containers. */ @@ -49,4 +52,10 @@ public interface XceiverServerSpi { void submitRequest(ContainerCommandRequestProto request, HddsProtos.PipelineID pipelineID) throws IOException; + + /** + * Get pipeline report for the XceiverServer instance. + * @return list of report for each pipeline. + */ + List<PipelineReport> getPipelineReport(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 24ea0b9..d88995b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -19,17 +19,18 @@ package org.apache.hadoop.ozone.container.common.transport.server.ratis; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReport; +import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ClosePipelineInfo; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineAction; +import org.apache.hadoop.hdds.scm.HddsServerUtil; import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; @@ -68,6 +69,8 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; @@ -96,12 +99,12 @@ public final class XceiverServerRatis implements XceiverServerSpi { private final ReplicationLevel replicationLevel; private long nodeFailureTimeoutMs; - private XceiverServerRatis(DatanodeDetails dd, int port, String storageDir, + private XceiverServerRatis(DatanodeDetails dd, int port, ContainerDispatcher dispatcher, Configuration conf, StateContext context) throws IOException { Objects.requireNonNull(dd, "id == null"); this.port = port; - RaftProperties serverProperties = newRaftProperties(conf, storageDir); + RaftProperties serverProperties = newRaftProperties(conf); final int numWriteChunkThreads = conf.getInt( OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY, OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT); @@ -118,15 +121,13 @@ public final class XceiverServerRatis implements XceiverServerSpi { new ContainerStateMachine(dispatcher, chunkExecutor, this); this.server = RaftServer.newBuilder() .setServerId(RatisHelper.toRaftPeerId(dd)) - .setGroup(RatisHelper.emptyRaftGroup()) .setProperties(serverProperties) .setStateMachine(stateMachine) .build(); } - private RaftProperties newRaftProperties(Configuration conf, - String storageDir) { + private RaftProperties newRaftProperties(Configuration conf) { final RaftProperties properties = new RaftProperties(); // Set rpc type @@ -235,6 +236,7 @@ public final class XceiverServerRatis implements XceiverServerSpi { nodeFailureTimeoutMs = nodeFailureTimeout.toLong(TimeUnit.MILLISECONDS); // Set the ratis storage directory + String storageDir = HddsServerUtil.getOzoneDatanodeRatisDirectory(conf); RaftServerConfigKeys.setStorageDir(properties, new File(storageDir)); // For grpc set the maximum message size @@ -253,23 +255,9 @@ public final class XceiverServerRatis implements XceiverServerSpi { public static XceiverServerRatis newXceiverServerRatis( DatanodeDetails datanodeDetails, Configuration ozoneConf, ContainerDispatcher dispatcher, StateContext context) throws IOException { - final String ratisDir = File.separator + "ratis"; int localPort = ozoneConf.getInt( OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT, OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT_DEFAULT); - String storageDir = ozoneConf.get( - OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR); - - if (Strings.isNullOrEmpty(storageDir)) { - storageDir = ozoneConf.get(OzoneConfigKeys - .OZONE_METADATA_DIRS); - Preconditions.checkNotNull(storageDir, "ozone.metadata.dirs " + - "cannot be null, Please check your configs."); - storageDir = storageDir.concat(ratisDir); - LOG.warn("Storage directory for Ratis is not configured. Mapping Ratis " + - "storage under {}. It is a good idea to map this to an SSD disk.", - storageDir); - } // Get an available port on current node and // use that as the container port @@ -282,13 +270,6 @@ public final class XceiverServerRatis implements XceiverServerSpi { socket.bind(address); localPort = socket.getLocalPort(); LOG.info("Found a free port for the server : {}", localPort); - // If we have random local ports configured this means that it - // probably running under MiniOzoneCluster. Ratis locks the storage - // directories, so we need to pass different local directory for each - // local instance. So we map ratis directories under datanode ID. - storageDir = - storageDir.concat(File.separator + - datanodeDetails.getUuidString()); } catch (IOException e) { LOG.error("Unable find a random free port for the server, " + "fallback to use default port {}", localPort, e); @@ -296,7 +277,7 @@ public final class XceiverServerRatis implements XceiverServerSpi { } datanodeDetails.setPort( DatanodeDetails.newPort(DatanodeDetails.Port.Name.RATIS, localPort)); - return new XceiverServerRatis(datanodeDetails, localPort, storageDir, + return new XceiverServerRatis(datanodeDetails, localPort, dispatcher, ozoneConf, context); } @@ -363,7 +344,7 @@ public final class XceiverServerRatis implements XceiverServerSpi { public void submitRequest( ContainerCommandRequestProto request, HddsProtos.PipelineID pipelineID) throws IOException { - // ReplicationLevel.ALL ensures the transactions corresponding to + // ReplicationLevel.MAJORITY ensures the transactions corresponding to // the request here are applied on all the raft servers. RaftClientRequest raftClientRequest = createRaftClientRequest(request, pipelineID, @@ -427,13 +408,27 @@ public final class XceiverServerRatis implements XceiverServerSpi { + ".Reason : " + action.getClosePipeline().getDetailedReason()); } - void handleNodeSlowness( - RaftGroup group, RoleInfoProto roleInfoProto) { + @Override + public List<PipelineReport> getPipelineReport() { + try { + Iterable<RaftGroupId> gids = server.getGroupIds(); + List<PipelineReport> reports = new ArrayList<>(); + for (RaftGroupId groupId : gids) { + reports.add(PipelineReport.newBuilder() + .setPipelineID(PipelineID.valueOf(groupId).getProtobuf()) + .build()); + } + return reports; + } catch (Exception e) { + return null; + } + } + + void handleNodeSlowness(RaftGroup group, RoleInfoProto roleInfoProto) { handlePipelineFailure(group.getGroupId(), roleInfoProto); } - void handleNoLeader( - RaftGroup group, RoleInfoProto roleInfoProto) { + void handleNoLeader(RaftGroup group, RoleInfoProto roleInfoProto) { handlePipelineFailure(group.getGroupId(), roleInfoProto); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index 72a5804..ebacf75 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -25,6 +25,8 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; @@ -164,6 +166,16 @@ public class OzoneContainer { return this.containerSet.getContainerReport(); } + public PipelineReportsProto getPipelineReport() { + PipelineReportsProto.Builder pipelineReportsProto = + PipelineReportsProto.newBuilder(); + for (XceiverServerSpi serverInstance : server) { + pipelineReportsProto + .addAllPipelineReport(serverInstance.getPipelineReport()); + } + return pipelineReportsProto.build(); + } + /** * Submit ContainerRequest. * @param request http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java index a950a31..9296524 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java @@ -19,6 +19,8 @@ package org.apache.hadoop.ozone.protocol; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; +import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; @@ -69,9 +71,11 @@ public interface StorageContainerDatanodeProtocol { * @param containerReportsRequestProto - Container Reports. * @return SCM Command. */ - SCMRegisteredResponseProto register(DatanodeDetailsProto datanodeDetails, - NodeReportProto nodeReport, ContainerReportsProto - containerReportsRequestProto) throws IOException; + SCMRegisteredResponseProto register( + DatanodeDetailsProto datanodeDetails, + NodeReportProto nodeReport, + ContainerReportsProto containerReportsRequestProto, + PipelineReportsProto pipelineReports) throws IOException; /** * Used by datanode to send block deletion ACK to SCM. http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java index c9ef43f..b3c3eb3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java @@ -20,6 +20,8 @@ package org.apache.hadoop.ozone.protocol; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; +import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; @@ -51,10 +53,12 @@ public interface StorageContainerNodeProtocol { * Register the node if the node finds that it is not registered with any SCM. * @param datanodeDetails DatanodeDetails * @param nodeReport NodeReportProto + * @param pipelineReport PipelineReportsProto * @return SCMHeartbeatResponseProto */ RegisteredCommand register(DatanodeDetails datanodeDetails, - NodeReportProto nodeReport); + NodeReportProto nodeReport, + PipelineReportsProto pipelineReport); /** * Send heartbeat to indicate the datanode is alive and doing well. http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java index 40fe189..b9cf6f9 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java @@ -20,6 +20,8 @@ import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; +import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; @@ -149,12 +151,14 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB @Override public SCMRegisteredResponseProto register( DatanodeDetailsProto datanodeDetailsProto, NodeReportProto nodeReport, - ContainerReportsProto containerReportsRequestProto) + ContainerReportsProto containerReportsRequestProto, + PipelineReportsProto pipelineReportsProto) throws IOException { SCMRegisterRequestProto.Builder req = SCMRegisterRequestProto.newBuilder(); req.setDatanodeDetails(datanodeDetailsProto); req.setContainerReport(containerReportsRequestProto); + req.setPipelineReports(pipelineReportsProto); req.setNodeReport(nodeReport); final SCMRegisteredResponseProto response; try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java index 7e8bd8a..ed01822 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java @@ -19,6 +19,8 @@ package org.apache.hadoop.ozone.protocolPB; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; +import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMRegisterRequestProto; @@ -76,8 +78,9 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB ContainerReportsProto containerRequestProto = request .getContainerReport(); NodeReportProto dnNodeReport = request.getNodeReport(); + PipelineReportsProto pipelineReport = request.getPipelineReports(); return impl.register(request.getDatanodeDetails(), dnNodeReport, - containerRequestProto); + containerRequestProto, pipelineReport); } catch (IOException e) { throw new ServiceException(e); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index 0a69343..78758cb 100644 --- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -52,6 +52,7 @@ message SCMRegisterRequestProto { required DatanodeDetailsProto datanodeDetails = 1; required NodeReportProto nodeReport = 2; required ContainerReportsProto containerReport = 3; + required PipelineReportsProto pipelineReports = 4; } /** @@ -82,6 +83,7 @@ message SCMHeartbeatRequestProto { optional CommandStatusReportsProto commandStatusReport = 4; optional ContainerActionsProto containerActions = 5; optional PipelineActionsProto pipelineActions = 6; + optional PipelineReportsProto pipelineReports = 7; } /* @@ -163,6 +165,14 @@ message ContainerAction { optional Reason reason = 3; } +message PipelineReport { + required PipelineID pipelineID = 1; +} + +message PipelineReportsProto { + repeated PipelineReport pipelineReport = 1; +} + message PipelineActionsProto { repeated PipelineAction pipelineActions = 1; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java index 751775f..27b6272 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java @@ -18,6 +18,10 @@ package org.apache.hadoop.ozone.container.common; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.protocol.proto. StorageContainerDatanodeProtocolProtos.CommandStatus; import org.apache.hadoop.hdds.scm.VersionInfo; @@ -214,8 +218,8 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol { public StorageContainerDatanodeProtocolProtos .SCMRegisteredResponseProto register( DatanodeDetailsProto datanodeDetailsProto, NodeReportProto nodeReport, - StorageContainerDatanodeProtocolProtos.ContainerReportsProto - containerReportsRequestProto) + ContainerReportsProto containerReportsRequestProto, + PipelineReportsProto pipelineReportsProto) throws IOException { rpcCount.incrementAndGet(); updateNodeReport(datanodeDetailsProto, nodeReport); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java index 206e24b..eb0a0b4 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java @@ -466,24 +466,6 @@ public class ContainerMapping implements Mapping { return new ContainerWithPipeline(containerInfo, pipeline); } - public void handlePipelineClose(PipelineID pipelineID) { - try { - Pipeline pipeline = pipelineSelector.getPipeline(pipelineID); - if (pipeline != null) { - pipelineSelector.finalizePipeline(pipeline); - } else { - LOG.debug("pipeline:{} not found", pipelineID); - } - } catch (Exception e) { - LOG.info("failed to close pipeline:{}", pipelineID, e); - } - } - - public Set<PipelineID> getPipelineOnDatanode( - DatanodeDetails datanodeDetails) { - return pipelineSelector.getPipelineId(datanodeDetails.getUuid()); - } - /** * Process container report from Datanode. * <p> @@ -710,7 +692,6 @@ public class ContainerMapping implements Mapping { return containerStore; } - @VisibleForTesting public PipelineSelector getPipelineSelector() { return pipelineSelector; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java index dcbd49c..3f156de 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java @@ -89,20 +89,20 @@ public class ContainerReportHandler implements .map(ContainerID::new) .collect(Collectors.toSet()); - ReportResult reportResult = node2ContainerMap + ReportResult<ContainerID> reportResult = node2ContainerMap .processReport(datanodeOrigin.getUuid(), containerIds); //we have the report, so we can update the states for the next iteration. node2ContainerMap .setContainersForDatanode(datanodeOrigin.getUuid(), containerIds); - for (ContainerID containerID : reportResult.getMissingContainers()) { + for (ContainerID containerID : reportResult.getMissingEntries()) { containerStateManager .removeContainerReplica(containerID, datanodeOrigin); checkReplicationState(containerID, publisher); } - for (ContainerID containerID : reportResult.getNewContainers()) { + for (ContainerID containerID : reportResult.getNewEntries()) { containerStateManager.addContainerReplica(containerID, datanodeOrigin); checkReplicationState(containerID, publisher); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java index 1b0c57c..5ed80cb 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java @@ -25,13 +25,12 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; import java.io.Closeable; import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.Set; /** * Mapping class contains the mapping from a name to a pipeline mapping. This is @@ -138,15 +137,5 @@ public interface Mapping extends Closeable { String owner, ReplicationType type, ReplicationFactor factor, LifeCycleState state) throws IOException; - /** - * Handle a pipeline close event. - * @param pipelineID pipeline id - */ - void handlePipelineClose(PipelineID pipelineID); - - /** - * Get set of pipeline for a specific datanode. - * @param datanodeDetails datanode for which pipelines needs to be fetched. - */ - Set<PipelineID> getPipelineOnDatanode(DatanodeDetails datanodeDetails); + PipelineSelector getPipelineSelector(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java index 9d72eb1..745e052 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java @@ -27,10 +27,13 @@ import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler .DeleteBlockCommandStatus; import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler .ReplicationStatus; -import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler.CloseContainerRetryableReq; +import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler + .CloseContainerRetryableReq; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher + .PipelineReportFromDatanode; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher .PipelineActionsFromDatanode; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher .ContainerActionsFromDatanode; @@ -72,8 +75,7 @@ public final class SCMEvents { /** * ContainerReports are send out by Datanodes. This report is received by - * SCMDatanodeHeartbeatDispatcher and Container_Report Event - * isTestSCMDatanodeHeartbeatDispatcher generated. + * SCMDatanodeHeartbeatDispatcher and Container_Report Event is generated. */ public static final TypedEvent<ContainerReportFromDatanode> CONTAINER_REPORT = new TypedEvent<>(ContainerReportFromDatanode.class, "Container_Report"); @@ -87,6 +89,13 @@ public final class SCMEvents { "Container_Actions"); /** + * PipelineReports are send out by Datanodes. This report is received by + * SCMDatanodeHeartbeatDispatcher and Pipeline_Report Event is generated. + */ + public static final TypedEvent<PipelineReportFromDatanode> PIPELINE_REPORT = + new TypedEvent<>(PipelineReportFromDatanode.class, "Pipeline_Report"); + + /** * PipelineActions are sent by Datanode. This event is received by * SCMDatanodeHeartbeatDispatcher and PIPELINE_ACTIONS event is generated. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index fca08bd..58da1cc 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hdds.scm.node; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; @@ -363,7 +365,8 @@ public class SCMNodeManager */ @Override public RegisteredCommand register( - DatanodeDetails datanodeDetails, NodeReportProto nodeReport) { + DatanodeDetails datanodeDetails, NodeReportProto nodeReport, + PipelineReportsProto pipelineReportsProto) { InetAddress dnAddress = Server.getRemoteIp(); if (dnAddress != null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java index b435e77..ddbba82 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java @@ -19,17 +19,13 @@ package org.apache.hadoop.hdds.scm.node; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.container.Mapping; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; -import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; +import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Set; - /** * Handles Stale node event. */ @@ -37,22 +33,17 @@ public class StaleNodeHandler implements EventHandler<DatanodeDetails> { static final Logger LOG = LoggerFactory.getLogger(StaleNodeHandler.class); private final Node2ContainerMap node2ContainerMap; - private final Mapping containerManager; + private final PipelineSelector pipelineSelector; public StaleNodeHandler(Node2ContainerMap node2ContainerMap, - Mapping containerManager) { + PipelineSelector pipelineSelector) { this.node2ContainerMap = node2ContainerMap; - this.containerManager = containerManager; + this.pipelineSelector = pipelineSelector; } @Override public void onMessage(DatanodeDetails datanodeDetails, EventPublisher publisher) { - Set<PipelineID> pipelineIDs = - containerManager.getPipelineOnDatanode(datanodeDetails); - for (PipelineID id : pipelineIDs) { - LOG.info("closing pipeline {}.", id); - publisher.fireEvent(SCMEvents.PIPELINE_CLOSE, id); - } + pipelineSelector.handleStaleNode(datanodeDetails); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java index 97c254b..549080a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java @@ -18,13 +18,9 @@ package org.apache.hadoop.hdds.scm.node.states; -import java.util.Collections; import java.util.HashSet; -import java.util.Map; import java.util.Set; -import java.util.TreeSet; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.exceptions.SCMException; @@ -32,34 +28,29 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes - .DUPLICATE_DATANODE; -import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes .NO_SUCH_DATANODE; /** * This data structure maintains the list of containers that is on a datanode. * This information is built from the DN container reports. */ -public class Node2ContainerMap { - private final Map<UUID, Set<ContainerID>> dn2ContainerMap; +public class Node2ContainerMap extends Node2ObjectsMap<ContainerID> { /** * Constructs a Node2ContainerMap Object. */ public Node2ContainerMap() { - dn2ContainerMap = new ConcurrentHashMap<>(); + super(); } /** - * Returns true if this a datanode that is already tracked by - * Node2ContainerMap. + * Returns null if there no containers associated with this datanode ID. * - * @param datanodeID - UUID of the Datanode. - * @return True if this is tracked, false if this map does not know about it. + * @param datanode - UUID + * @return Set of containers or Null. */ - public boolean isKnownDatanode(UUID datanodeID) { - Preconditions.checkNotNull(datanodeID); - return dn2ContainerMap.containsKey(datanodeID); + public Set<ContainerID> getContainers(UUID datanode) { + return getObjects(datanode); } /** @@ -70,13 +61,7 @@ public class Node2ContainerMap { */ public void insertNewDatanode(UUID datanodeID, Set<ContainerID> containerIDs) throws SCMException { - Preconditions.checkNotNull(containerIDs); - Preconditions.checkNotNull(datanodeID); - if (dn2ContainerMap.putIfAbsent(datanodeID, new HashSet<>(containerIDs)) - != null) { - throw new SCMException("Node already exists in the map", - DUPLICATE_DATANODE); - } + super.insertNewDatanode(datanodeID, containerIDs); } /** @@ -91,103 +76,15 @@ public class Node2ContainerMap { Set<ContainerID> containers) throws SCMException { Preconditions.checkNotNull(datanodeID); Preconditions.checkNotNull(containers); - if (dn2ContainerMap + if (dn2ObjectMap .computeIfPresent(datanodeID, (k, v) -> new HashSet<>(containers)) == null) { throw new SCMException("No such datanode", NO_SUCH_DATANODE); } } - /** - * Removes datanode Entry from the map. - * - * @param datanodeID - Datanode ID. - */ - public void removeDatanode(UUID datanodeID) { - Preconditions.checkNotNull(datanodeID); - dn2ContainerMap.computeIfPresent(datanodeID, (k, v) -> null); - } - - /** - * Returns null if there no containers associated with this datanode ID. - * - * @param datanode - UUID - * @return Set of containers or Null. - */ - public Set<ContainerID> getContainers(UUID datanode) { - Preconditions.checkNotNull(datanode); - return dn2ContainerMap.computeIfPresent(datanode, (k, v) -> - Collections.unmodifiableSet(v)); - } - - public ReportResult processReport(UUID datanodeID, Set<ContainerID> - containers) { - Preconditions.checkNotNull(datanodeID); - Preconditions.checkNotNull(containers); - - if (!isKnownDatanode(datanodeID)) { - return ReportResult.ReportResultBuilder.newBuilder() - .setStatus(ReportStatus.NEW_DATANODE_FOUND) - .setNewContainers(containers) - .build(); - } - - // Conditions like Zero length containers should be handled by removeAll. - Set<ContainerID> currentSet = dn2ContainerMap.get(datanodeID); - TreeSet<ContainerID> newContainers = new TreeSet<>(containers); - newContainers.removeAll(currentSet); - - TreeSet<ContainerID> missingContainers = new TreeSet<>(currentSet); - missingContainers.removeAll(containers); - - if (newContainers.isEmpty() && missingContainers.isEmpty()) { - return ReportResult.ReportResultBuilder.newBuilder() - .setStatus(ReportStatus.ALL_IS_WELL) - .build(); - } - - if (newContainers.isEmpty() && !missingContainers.isEmpty()) { - return ReportResult.ReportResultBuilder.newBuilder() - .setStatus(ReportStatus.MISSING_CONTAINERS) - .setMissingContainers(missingContainers) - .build(); - } - - if (!newContainers.isEmpty() && missingContainers.isEmpty()) { - return ReportResult.ReportResultBuilder.newBuilder() - .setStatus(ReportStatus.NEW_CONTAINERS_FOUND) - .setNewContainers(newContainers) - .build(); - } - - if (!newContainers.isEmpty() && !missingContainers.isEmpty()) { - return ReportResult.ReportResultBuilder.newBuilder() - .setStatus(ReportStatus.MISSING_AND_NEW_CONTAINERS_FOUND) - .setNewContainers(newContainers) - .setMissingContainers(missingContainers) - .build(); - } - - // default status & Make compiler happy - return ReportResult.ReportResultBuilder.newBuilder() - .setStatus(ReportStatus.ALL_IS_WELL) - .build(); - } - - /** - * Results possible from processing a container report by - * Node2ContainerMapper. - */ - public enum ReportStatus { - ALL_IS_WELL, - MISSING_CONTAINERS, - NEW_CONTAINERS_FOUND, - MISSING_AND_NEW_CONTAINERS_FOUND, - NEW_DATANODE_FOUND - } - @VisibleForTesting public int size() { - return dn2ContainerMap.size(); + return dn2ObjectMap.size(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java new file mode 100644 index 0000000..e49a79c --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.apache.hadoop.hdds.scm.node.states; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; + +import java.util.UUID; +import java.util.Set; +import java.util.Map; +import java.util.TreeSet; +import java.util.HashSet; +import java.util.Collections; + +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.DUPLICATE_DATANODE; + +/** + * This data structure maintains the list of containers that is on a datanode. + * This information is built from the DN container reports. + */ +public class Node2ObjectsMap<T> { + protected final Map<UUID, Set<T>> dn2ObjectMap; + + /** + * Constructs a Node2ContainerMap Object. + */ + public Node2ObjectsMap() { + dn2ObjectMap = new ConcurrentHashMap<>(); + } + + /** + * Returns true if this a datanode that is already tracked by + * Node2ContainerMap. + * + * @param datanodeID - UUID of the Datanode. + * @return True if this is tracked, false if this map does not know about it. + */ + public boolean isKnownDatanode(UUID datanodeID) { + Preconditions.checkNotNull(datanodeID); + return dn2ObjectMap.containsKey(datanodeID); + } + + /** + * Insert a new datanode into Node2Container Map. + * + * @param datanodeID -- Datanode UUID + * @param containerIDs - List of ContainerIDs. + */ + public void insertNewDatanode(UUID datanodeID, Set<T> containerIDs) + throws SCMException { + Preconditions.checkNotNull(containerIDs); + Preconditions.checkNotNull(datanodeID); + if (dn2ObjectMap.putIfAbsent(datanodeID, new HashSet<>(containerIDs)) + != null) { + throw new SCMException("Node already exists in the map", + DUPLICATE_DATANODE); + } + } + + /** + * Removes datanode Entry from the map. + * + * @param datanodeID - Datanode ID. + */ + void removeDatanode(UUID datanodeID) { + Preconditions.checkNotNull(datanodeID); + dn2ObjectMap.computeIfPresent(datanodeID, (k, v) -> null); + } + + /** + * Returns null if there no containers associated with this datanode ID. + * + * @param datanode - UUID + * @return Set of containers or Null. + */ + Set<T> getObjects(UUID datanode) { + Preconditions.checkNotNull(datanode); + final Set<T> s = dn2ObjectMap.get(datanode); + return s != null? Collections.unmodifiableSet(s): Collections.emptySet(); + } + + public ReportResult.ReportResultBuilder<T> newBuilder() { + return new ReportResult.ReportResultBuilder<>(); + } + + public ReportResult<T> processReport(UUID datanodeID, Set<T> objects) { + Preconditions.checkNotNull(datanodeID); + Preconditions.checkNotNull(objects); + + if (!isKnownDatanode(datanodeID)) { + return newBuilder() + .setStatus(ReportResult.ReportStatus.NEW_DATANODE_FOUND) + .setNewEntries(objects) + .build(); + } + + // Conditions like Zero length containers should be handled by removeAll. + Set<T> currentSet = dn2ObjectMap.get(datanodeID); + TreeSet<T> newObjects = new TreeSet<>(objects); + newObjects.removeAll(currentSet); + + TreeSet<T> missingObjects = new TreeSet<>(currentSet); + missingObjects.removeAll(objects); + + if (newObjects.isEmpty() && missingObjects.isEmpty()) { + return newBuilder() + .setStatus(ReportResult.ReportStatus.ALL_IS_WELL) + .build(); + } + + if (newObjects.isEmpty() && !missingObjects.isEmpty()) { + return newBuilder() + .setStatus(ReportResult.ReportStatus.MISSING_ENTRIES) + .setMissingEntries(missingObjects) + .build(); + } + + if (!newObjects.isEmpty() && missingObjects.isEmpty()) { + return newBuilder() + .setStatus(ReportResult.ReportStatus.NEW_ENTRIES_FOUND) + .setNewEntries(newObjects) + .build(); + } + + if (!newObjects.isEmpty() && !missingObjects.isEmpty()) { + return newBuilder() + .setStatus(ReportResult.ReportStatus.MISSING_AND_NEW_ENTRIES_FOUND) + .setNewEntries(newObjects) + .setMissingEntries(missingObjects) + .build(); + } + + // default status & Make compiler happy + return newBuilder() + .setStatus(ReportResult.ReportStatus.ALL_IS_WELL) + .build(); + } + + @VisibleForTesting + public int size() { + return dn2ObjectMap.size(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java index 9bb6cf1..0c7610f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java @@ -19,83 +19,92 @@ package org.apache.hadoop.hdds.scm.node.states; -import org.apache.hadoop.hdds.scm.container.ContainerID; - import java.util.Collections; import java.util.Set; import com.google.common.base.Preconditions; /** - * A Container Report gets processsed by the Node2Container and returns - * Report Result class. + * A Container/Pipeline Report gets processed by the + * Node2Container/Node2Pipeline and returns Report Result class. */ -public class ReportResult { - private Node2ContainerMap.ReportStatus status; - private Set<ContainerID> missingContainers; - private Set<ContainerID> newContainers; - - ReportResult(Node2ContainerMap.ReportStatus status, - Set<ContainerID> missingContainers, - Set<ContainerID> newContainers) { +public final class ReportResult<T> { + private ReportStatus status; + private Set<T> missingEntries; + private Set<T> newEntries; + + private ReportResult(ReportStatus status, + Set<T> missingEntries, + Set<T> newEntries) { this.status = status; - Preconditions.checkNotNull(missingContainers); - Preconditions.checkNotNull(newContainers); - this.missingContainers = missingContainers; - this.newContainers = newContainers; + Preconditions.checkNotNull(missingEntries); + Preconditions.checkNotNull(newEntries); + this.missingEntries = missingEntries; + this.newEntries = newEntries; } - public Node2ContainerMap.ReportStatus getStatus() { + public ReportStatus getStatus() { return status; } - public Set<ContainerID> getMissingContainers() { - return missingContainers; + public Set<T> getMissingEntries() { + return missingEntries; } - public Set<ContainerID> getNewContainers() { - return newContainers; + public Set<T> getNewEntries() { + return newEntries; } - static class ReportResultBuilder { - private Node2ContainerMap.ReportStatus status; - private Set<ContainerID> missingContainers; - private Set<ContainerID> newContainers; - - static ReportResultBuilder newBuilder() { - return new ReportResultBuilder(); - } - - public ReportResultBuilder setStatus( - Node2ContainerMap.ReportStatus newstatus) { - this.status = newstatus; + /** + * Result after processing report for node2Object map. + * @param <T> + */ + public static class ReportResultBuilder<T> { + private ReportStatus status; + private Set<T> missingEntries; + private Set<T> newEntries; + + public ReportResultBuilder<T> setStatus( + ReportStatus newStatus) { + this.status = newStatus; return this; } - public ReportResultBuilder setMissingContainers( - Set<ContainerID> missingContainersLit) { - this.missingContainers = missingContainersLit; + public ReportResultBuilder<T> setMissingEntries( + Set<T> missingEntriesList) { + this.missingEntries = missingEntriesList; return this; } - public ReportResultBuilder setNewContainers( - Set<ContainerID> newContainersList) { - this.newContainers = newContainersList; + public ReportResultBuilder<T> setNewEntries( + Set<T> newEntriesList) { + this.newEntries = newEntriesList; return this; } - ReportResult build() { + public ReportResult<T> build() { - Set<ContainerID> nullSafeMissingContainers = this.missingContainers; - Set<ContainerID> nullSafeNewContainers = this.newContainers; - if (nullSafeNewContainers == null) { - nullSafeNewContainers = Collections.emptySet(); + Set<T> nullSafeMissingEntries = this.missingEntries; + Set<T> nullSafeNewEntries = this.newEntries; + if (nullSafeNewEntries == null) { + nullSafeNewEntries = Collections.emptySet(); } - if (nullSafeMissingContainers == null) { - nullSafeMissingContainers = Collections.emptySet(); + if (nullSafeMissingEntries == null) { + nullSafeMissingEntries = Collections.emptySet(); } - return new ReportResult(status, nullSafeMissingContainers, - nullSafeNewContainers); + return new ReportResult<T>(status, nullSafeMissingEntries, + nullSafeNewEntries); } } + + /** + * Results possible from processing a report. + */ + public enum ReportStatus { + ALL_IS_WELL, + MISSING_ENTRIES, + NEW_ENTRIES_FOUND, + MISSING_AND_NEW_ENTRIES_FOUND, + NEW_DATANODE_FOUND, + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java index 363ce71..87f2222 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java @@ -16,19 +16,15 @@ * */ -package org.apache.hadoop.hdds.scm.pipelines; +package org.apache.hadoop.hdds.scm.node.states; -import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; -import java.util.Collections; import java.util.HashSet; -import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; /** * This data structure maintains the list of pipelines which the given datanode is a part of. This @@ -36,33 +32,11 @@ import java.util.concurrent.ConcurrentHashMap; * * <p>TODO: this information needs to be regenerated from pipeline reports on SCM restart */ -public class Node2PipelineMap { - private final Map<UUID, Set<PipelineID>> dn2PipelineMap; +public class Node2PipelineMap extends Node2ObjectsMap<PipelineID> { /** Constructs a Node2PipelineMap Object. */ public Node2PipelineMap() { - dn2PipelineMap = new ConcurrentHashMap<>(); - } - - /** - * Returns true if this a datanode that is already tracked by Node2PipelineMap. - * - * @param datanodeID - UUID of the Datanode. - * @return True if this is tracked, false if this map does not know about it. - */ - private boolean isKnownDatanode(UUID datanodeID) { - Preconditions.checkNotNull(datanodeID); - return dn2PipelineMap.containsKey(datanodeID); - } - - /** - * Removes datanode Entry from the map. - * - * @param datanodeID - Datanode ID. - */ - public synchronized void removeDatanode(UUID datanodeID) { - Preconditions.checkNotNull(datanodeID); - dn2PipelineMap.computeIfPresent(datanodeID, (k, v) -> null); + super(); } /** @@ -72,9 +46,7 @@ public class Node2PipelineMap { * @return Set of pipelines or Null. */ public Set<PipelineID> getPipelines(UUID datanode) { - Preconditions.checkNotNull(datanode); - final Set<PipelineID> s = dn2PipelineMap.get(datanode); - return s != null? Collections.unmodifiableSet(s): Collections.emptySet(); + return getObjects(datanode); } /** @@ -85,7 +57,7 @@ public class Node2PipelineMap { public synchronized void addPipeline(Pipeline pipeline) { for (DatanodeDetails details : pipeline.getDatanodes().values()) { UUID dnId = details.getUuid(); - dn2PipelineMap.computeIfAbsent(dnId, k -> new HashSet<>()) + dn2ObjectMap.computeIfAbsent(dnId, k -> new HashSet<>()) .add(pipeline.getId()); } } @@ -93,16 +65,11 @@ public class Node2PipelineMap { public synchronized void removePipeline(Pipeline pipeline) { for (DatanodeDetails details : pipeline.getDatanodes().values()) { UUID dnId = details.getUuid(); - dn2PipelineMap.computeIfPresent( - dnId, + dn2ObjectMap.computeIfPresent(dnId, (k, v) -> { v.remove(pipeline.getId()); return v; }); } } - - public Map<UUID, Set<PipelineID>> getDn2PipelineMap() { - return Collections.unmodifiableMap(dn2PipelineMap); - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java index 733dec5..e49678f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java @@ -17,22 +17,36 @@ package org.apache.hadoop.hdds.scm.pipelines; -import org.apache.hadoop.hdds.scm.container.Mapping; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Handles pipeline close event. */ public class PipelineCloseHandler implements EventHandler<PipelineID> { - private final Mapping mapping; - public PipelineCloseHandler(Mapping mapping) { - this.mapping = mapping; + private static final Logger LOG = LoggerFactory + .getLogger(PipelineCloseHandler.class); + + private final PipelineSelector pipelineSelector; + public PipelineCloseHandler(PipelineSelector pipelineSelector) { + this.pipelineSelector = pipelineSelector; } @Override public void onMessage(PipelineID pipelineID, EventPublisher publisher) { - mapping.handlePipelineClose(pipelineID); + Pipeline pipeline = pipelineSelector.getPipeline(pipelineID); + try { + if (pipeline != null) { + pipelineSelector.finalizePipeline(pipeline); + } else { + LOG.debug("pipeline:{} not found", pipelineID); + } + } catch (Exception e) { + LOG.info("failed to close pipeline:{}", pipelineID, e); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java index 07ff2b0..ca2e878 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdds.scm.pipelines; import java.util.ArrayList; import java.util.LinkedList; + +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; @@ -36,7 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger; public abstract class PipelineManager { private static final Logger LOG = LoggerFactory.getLogger(PipelineManager.class); - private final ArrayList<ActivePipelines> activePipelines; + protected final ArrayList<ActivePipelines> activePipelines; public PipelineManager() { activePipelines = new ArrayList<>(); @@ -45,7 +47,10 @@ public abstract class PipelineManager { } } - private static class ActivePipelines { + /** + * List of active pipelines. + */ + public static class ActivePipelines { private final List<PipelineID> activePipelines; private final AtomicInteger pipelineIndex; @@ -55,10 +60,12 @@ public abstract class PipelineManager { } void addPipeline(PipelineID pipelineID) { - activePipelines.add(pipelineID); + if (!activePipelines.contains(pipelineID)) { + activePipelines.add(pipelineID); + } } - void removePipeline(PipelineID pipelineID) { + public void removePipeline(PipelineID pipelineID) { activePipelines.remove(pipelineID); } @@ -117,17 +124,6 @@ public abstract class PipelineManager { .addPipeline(pipeline.getId()); } - protected static int getReplicationCount(ReplicationFactor factor) { - switch (factor) { - case ONE: - return 1; - case THREE: - return 3; - default: - throw new IllegalArgumentException("Unexpected replication count"); - } - } - public abstract Pipeline allocatePipeline( ReplicationFactor replicationFactor); @@ -137,6 +133,14 @@ public abstract class PipelineManager { */ public abstract void initializePipeline(Pipeline pipeline) throws IOException; + public void processPipelineReport(Pipeline pipeline, DatanodeDetails dn) { + if (pipeline.addMember(dn) + &&(pipeline.getDatanodes().size() == pipeline.getFactor().getNumber()) + && pipeline.getLifeCycleState() == HddsProtos.LifeCycleState.OPEN) { + addOpenPipeline(pipeline); + } + } + /** * Creates a pipeline with a specified replication factor and type. * @param replicationFactor - Replication Factor. @@ -157,27 +161,11 @@ public abstract class PipelineManager { * Remove the pipeline from active allocation. * @param pipeline pipeline to be finalized */ - public synchronized void finalizePipeline(Pipeline pipeline) { - activePipelines.get(pipeline.getFactor().ordinal()) - .removePipeline(pipeline.getId()); - } + public abstract boolean finalizePipeline(Pipeline pipeline); /** * * @param pipeline */ public abstract void closePipeline(Pipeline pipeline) throws IOException; - - /** - * list members in the pipeline. - * @return the datanode - */ - public abstract List<DatanodeDetails> getMembers(PipelineID pipelineID) - throws IOException; - - /** - * Update the datanode list of the pipeline. - */ - public abstract void updatePipeline(PipelineID pipelineID, - List<DatanodeDetails> newDatanodes) throws IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0956ee2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineReportHandler.java new file mode 100644 index 0000000..933792b --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineReportHandler.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.pipelines; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; +import org.apache.hadoop.hdds.scm.server + .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handles Node Reports from datanode. + */ +public class PipelineReportHandler implements + EventHandler<PipelineReportFromDatanode> { + + private static final Logger LOGGER = LoggerFactory + .getLogger(PipelineReportHandler.class); + private final PipelineSelector pipelineSelector; + + public PipelineReportHandler(PipelineSelector pipelineSelector) { + Preconditions.checkNotNull(pipelineSelector); + this.pipelineSelector = pipelineSelector; + } + + @Override + public void onMessage(PipelineReportFromDatanode pipelineReportFromDatanode, + EventPublisher publisher) { + Preconditions.checkNotNull(pipelineReportFromDatanode); + DatanodeDetails dn = pipelineReportFromDatanode.getDatanodeDetails(); + PipelineReportsProto pipelineReport = + pipelineReportFromDatanode.getReport(); + Preconditions.checkNotNull(dn, "Pipeline Report is " + + "missing DatanodeDetails."); + LOGGER.trace("Processing pipeline report for dn: {}", dn); + pipelineSelector.processPipelineReport(dn, pipelineReport); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
