This is an automated email from the ASF dual-hosted git repository. elek pushed a commit to branch HDDS-1569 in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
commit fbef6c3d732e20ae45f9c5d43cd3d57670b90dd3 Author: timmycheng <timmych...@tencent.com> AuthorDate: Wed Sep 25 11:40:12 2019 +0800 HDDS-1569 Support creating multiple pipelines with same datanode. --- .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 3 +- .../ContainerPlacementPolicyFactory.java | 8 +-- .../hdds/scm/node/states/Node2PipelineMap.java | 4 ++ .../hdds/scm/pipeline/PipelineActionHandler.java | 2 +- .../hdds/scm/pipeline/PipelinePlacementPolicy.java | 52 +++++++++++------ .../hadoop/hdds/scm/pipeline/PipelineStateMap.java | 5 +- .../hdds/scm/pipeline/RatisPipelineProvider.java | 68 ++-------------------- .../hdds/scm/pipeline/SCMPipelineManager.java | 1 + .../scm/safemode/HealthyPipelineSafeModeRule.java | 65 ++++++++++----------- .../hadoop/hdds/scm/node/TestDeadNodeHandler.java | 3 + .../scm/pipeline/TestPipelinePlacementPolicy.java | 11 +++- .../hdds/scm/pipeline/TestPipelineClose.java | 2 +- .../scm/pipeline/TestRatisPipelineProvider.java | 32 +++++----- .../hdds/scm/pipeline/TestSCMPipelineManager.java | 11 ++-- .../org/apache/hadoop/ozone/MiniOzoneCluster.java | 4 ++ 15 files changed, 122 insertions(+), 149 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index ad7073e..fe51f51 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -317,7 +317,8 @@ public final class ScmConfigKeys { // the max number of pipelines can a single datanode be engaged in. public static final String OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT = "ozone.scm.datanode.max.pipeline.engagement"; - public static final int OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT = 5; + // Setting to zero by default means this limit doesn't take effect. + public static final int OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT = 0; public static final String OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY = diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java index adaeb87..74431f9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java @@ -43,10 +43,10 @@ public final class ContainerPlacementPolicyFactory { } - public static PlacementPolicy getPolicy(Configuration conf, - final NodeManager nodeManager, NetworkTopology clusterMap, - final boolean fallback, SCMContainerPlacementMetrics metrics) - throws SCMException{ + public static PlacementPolicy getPolicy( + Configuration conf, final NodeManager nodeManager, + NetworkTopology clusterMap, final boolean fallback, + SCMContainerPlacementMetrics metrics) throws SCMException{ final Class<? extends PlacementPolicy> placementClass = conf .getClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY, OZONE_SCM_CONTAINER_PLACEMENT_IMPL_DEFAULT, diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java index 714188d..496b9e7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java @@ -71,6 +71,10 @@ public class Node2PipelineMap extends Node2ObjectsMap<PipelineID> { UUID dnId = details.getUuid(); dn2ObjectMap.computeIfAbsent(dnId, k -> ConcurrentHashMap.newKeySet()) .add(pipeline.getId()); + dn2ObjectMap.computeIfPresent(dnId, (k, v) -> { + v.add(pipeline.getId()); + return v; + }); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java index 8d497fa..8d040f1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java @@ -57,7 +57,7 @@ public class PipelineActionHandler pipelineID = PipelineID. getFromProtobuf(action.getClosePipeline().getPipelineID()); Pipeline pipeline = pipelineManager.getPipeline(pipelineID); - LOG.error("Received pipeline action {} for {} from datanode {}. " + + LOG.info("Received pipeline action {} for {} from datanode {}. " + "Reason : {}", action.getAction(), pipeline, report.getDatanodeDetails(), action.getClosePipeline().getDetailedReason()); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java index 1983ed6..df46fad 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java @@ -79,8 +79,20 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { * @return true if we have enough space. */ @VisibleForTesting - boolean meetCriteria(DatanodeDetails datanodeDetails, long heavyNodeLimit) { - return (nodeManager.getPipelinesCount(datanodeDetails) <= heavyNodeLimit); + boolean meetCriteria(DatanodeDetails datanodeDetails) { + if (heavyNodeCriteria == 0) { + // no limit applied. + return true; + } + boolean meet = (nodeManager.getPipelinesCount(datanodeDetails) + < heavyNodeCriteria); + if (!meet) { + LOG.info("Pipeline Placement: can't place more pipeline on heavy " + + "datanodeļ¼ " + datanodeDetails.getUuid().toString() + " Heaviness: " + + nodeManager.getPipelinesCount(datanodeDetails) + " limit: " + + heavyNodeCriteria); + } + return meet; } /** @@ -102,18 +114,19 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { if (excludedNodes != null) { healthyNodes.removeAll(excludedNodes); } + int initialHealthyNodesCount = healthyNodes.size(); String msg; - if (healthyNodes.size() == 0) { + if (initialHealthyNodesCount == 0) { msg = "No healthy node found to allocate pipeline."; LOG.error(msg); throw new SCMException(msg, SCMException.ResultCodes .FAILED_TO_FIND_HEALTHY_NODES); } - if (healthyNodes.size() < nodesRequired) { + if (initialHealthyNodesCount < nodesRequired) { msg = String.format("Not enough healthy nodes to allocate pipeline. %d " + " datanodes required. Found %d", - nodesRequired, healthyNodes.size()); + nodesRequired, initialHealthyNodesCount); LOG.error(msg); throw new SCMException(msg, SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); @@ -122,13 +135,15 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { // filter nodes that meet the size and pipeline engagement criteria. // Pipeline placement doesn't take node space left into account. List<DatanodeDetails> healthyList = healthyNodes.stream().filter(d -> - meetCriteria(d, heavyNodeCriteria)).collect(Collectors.toList()); + meetCriteria(d)).collect(Collectors.toList()); if (healthyList.size() < nodesRequired) { msg = String.format("Unable to find enough nodes that meet " + "the criteria that cannot engage in more than %d pipelines." + - " Nodes required: %d Found: %d", - heavyNodeCriteria, nodesRequired, healthyList.size()); + " Nodes required: %d Found: %d, healthy nodes count in" + + "NodeManager: %d.", + heavyNodeCriteria, nodesRequired, healthyList.size(), + initialHealthyNodesCount); LOG.error(msg); throw new SCMException(msg, SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); @@ -155,12 +170,11 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { List<DatanodeDetails> healthyNodes = filterViableNodes(excludedNodes, nodesRequired); - // Randomly picks nodes when all nodes are equal. + // Randomly picks nodes when all nodes are equal or factor is ONE. // This happens when network topology is absent or // all nodes are on the same rack. - if (checkAllNodesAreEqual(nodeManager.getClusterNetworkTopologyMap())) { - LOG.info("All nodes are considered equal. Now randomly pick nodes. " + - "Required nodes: {}", nodesRequired); + if (checkAllNodesAreEqual(nodeManager.getClusterNetworkTopologyMap()) + || nodesRequired == HddsProtos.ReplicationFactor.ONE.getNumber()) { return super.getResultSet(nodesRequired, healthyNodes); } else { // Since topology and rack awareness are available, picks nodes @@ -188,8 +202,8 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { // First choose an anchor nodes randomly DatanodeDetails anchor = chooseNode(healthyNodes); if (anchor == null) { - LOG.error("Unable to find the first healthy nodes that " + - "meet the criteria. Required nodes: {}, Found nodes: {}", + LOG.error("Pipeline Placement: Unable to find the first healthy nodes " + + "that meet the criteria. Required nodes: {}, Found nodes: {}", nodesRequired, results.size()); throw new SCMException("Unable to find required number of nodes.", SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); @@ -204,8 +218,8 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { healthyNodes, exclude, nodeManager.getClusterNetworkTopologyMap(), anchor); if (nodeOnDifferentRack == null) { - LOG.error("Unable to find nodes on different racks that " + - "meet the criteria. Required nodes: {}, Found nodes: {}", + LOG.error("Pipeline Placement: Unable to find nodes on different racks " + + " that meet the criteria. Required nodes: {}, Found nodes: {}", nodesRequired, results.size()); throw new SCMException("Unable to find required number of nodes.", SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); @@ -228,9 +242,9 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { } if (results.size() < nodesRequired) { - LOG.error("Unable to find the required number of healthy nodes that " + - "meet the criteria. Required nodes: {}, Found nodes: {}", - nodesRequired, results.size()); + LOG.error("Pipeline Placement: Unable to find the required number of " + + "healthy nodes that meet the criteria. Required nodes: {}, " + + "Found nodes: {}", nodesRequired, results.size()); throw new SCMException("Unable to find required number of nodes.", SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java index 443378c..8e0f32d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java @@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -52,8 +53,8 @@ class PipelineStateMap { PipelineStateMap() { // TODO: Use TreeMap for range operations? - pipelineMap = new HashMap<>(); - pipeline2container = new HashMap<>(); + pipelineMap = new ConcurrentHashMap<>(); + pipeline2container = new ConcurrentHashMap<>(); query2OpenPipelines = new HashMap<>(); initializeQueryMap(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index 9409728..6a51957 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -22,11 +22,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; -import org.apache.hadoop.hdds.scm.PlacementPolicy; -import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState; import org.apache.hadoop.io.MultipleIOException; @@ -44,19 +41,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinWorkerThread; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; /** * Implements Api for creating ratis pipelines. @@ -69,6 +61,7 @@ public class RatisPipelineProvider implements PipelineProvider { private final NodeManager nodeManager; private final PipelineStateManager stateManager; private final Configuration conf; + private final PipelinePlacementPolicy placementPolicy; // Set parallelism at 3, as now in Ratis we create 1 and 3 node pipelines. private final int parallelismForPool = 3; @@ -92,66 +85,13 @@ public class RatisPipelineProvider implements PipelineProvider { this.stateManager = stateManager; this.conf = conf; this.tlsConfig = tlsConfig; - } - - - /** - * Create pluggable container placement policy implementation instance. - * - * @param nodeManager - SCM node manager. - * @param conf - configuration. - * @return SCM container placement policy implementation instance. - */ - @SuppressWarnings("unchecked") - // TODO: should we rename PlacementPolicy to PipelinePlacementPolicy? - private static PlacementPolicy createContainerPlacementPolicy( - final NodeManager nodeManager, final Configuration conf) { - Class<? extends PlacementPolicy> implClass = - (Class<? extends PlacementPolicy>) conf.getClass( - ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY, - SCMContainerPlacementRandom.class); - - try { - Constructor<? extends PlacementPolicy> ctor = - implClass.getDeclaredConstructor(NodeManager.class, - Configuration.class); - return ctor.newInstance(nodeManager, conf); - } catch (RuntimeException e) { - throw e; - } catch (InvocationTargetException e) { - throw new RuntimeException(implClass.getName() - + " could not be constructed.", e.getCause()); - } catch (Exception e) { -// LOG.error("Unhandled exception occurred, Placement policy will not " + -// "be functional."); - throw new IllegalArgumentException("Unable to load " + - "PlacementPolicy", e); - } + this.placementPolicy = new PipelinePlacementPolicy(nodeManager, conf); } @Override public Pipeline create(ReplicationFactor factor) throws IOException { - // Get set of datanodes already used for ratis pipeline - Set<DatanodeDetails> dnsUsed = new HashSet<>(); - stateManager.getPipelines(ReplicationType.RATIS, factor).stream().filter( - p -> p.getPipelineState().equals(PipelineState.OPEN) || - p.getPipelineState().equals(PipelineState.DORMANT) || - p.getPipelineState().equals(PipelineState.ALLOCATED)) - .forEach(p -> dnsUsed.addAll(p.getNodes())); - - // Get list of healthy nodes - List<DatanodeDetails> dns = - nodeManager.getNodes(NodeState.HEALTHY) - .parallelStream() - .filter(dn -> !dnsUsed.contains(dn)) - .limit(factor.getNumber()) - .collect(Collectors.toList()); - if (dns.size() < factor.getNumber()) { - String e = String - .format("Cannot create pipeline of factor %d using %d nodes.", - factor.getNumber(), dns.size()); - throw new InsufficientDatanodesException(e); - } + List<DatanodeDetails> dns = placementPolicy.chooseDatanodes(null, + null, factor.getNumber(), 0); Pipeline pipeline = Pipeline.newBuilder() .setId(PipelineID.randomId()) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index 0964f6d..a927d56 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -164,6 +164,7 @@ public class SCMPipelineManager implements PipelineManager { throw idEx; } catch (IOException ex) { metrics.incNumPipelineCreationFailed(); + LOG.error("Pipeline creation failed.", ex); throw ex; } finally { lock.writeLock().unlock(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java index 7a00d76..b3aac5e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hdds.scm.safemode; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.HddsConfigKeys; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; @@ -56,7 +55,7 @@ public class HealthyPipelineSafeModeRule private final PipelineManager pipelineManager; private final int healthyPipelineThresholdCount; private int currentHealthyPipelineCount = 0; - private final Set<DatanodeDetails> processedDatanodeDetails = + private final Set<PipelineID> processedPipelineIDs = new HashSet<>(); HealthyPipelineSafeModeRule(String ruleName, EventQueue eventQueue, @@ -116,46 +115,46 @@ public class HealthyPipelineSafeModeRule // processed report event, we should not consider this pipeline report // from datanode again during threshold calculation. Preconditions.checkNotNull(pipelineReportFromDatanode); - DatanodeDetails dnDetails = pipelineReportFromDatanode.getDatanodeDetails(); - if (!processedDatanodeDetails.contains( - pipelineReportFromDatanode.getDatanodeDetails())) { - - Pipeline pipeline; - PipelineReportsProto pipelineReport = - pipelineReportFromDatanode.getReport(); - - for (PipelineReport report : pipelineReport.getPipelineReportList()) { - PipelineID pipelineID = PipelineID - .getFromProtobuf(report.getPipelineID()); - try { - pipeline = pipelineManager.getPipeline(pipelineID); - } catch (PipelineNotFoundException e) { - continue; - } - - if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE && - pipeline.getPipelineState() == Pipeline.PipelineState.OPEN) { - // If the pipeline is open state mean, all 3 datanodes are reported - // for this pipeline. - currentHealthyPipelineCount++; - getSafeModeMetrics().incCurrentHealthyPipelinesCount(); - } + + Pipeline pipeline; + PipelineReportsProto pipelineReport = + pipelineReportFromDatanode.getReport(); + + for (PipelineReport report : pipelineReport.getPipelineReportList()) { + PipelineID pipelineID = PipelineID + .getFromProtobuf(report.getPipelineID()); + if (processedPipelineIDs.contains(pipelineID)) { + continue; + } + + try { + pipeline = pipelineManager.getPipeline(pipelineID); + } catch (PipelineNotFoundException e) { + continue; } - if (scmInSafeMode()) { - SCMSafeModeManager.getLogger().info( - "SCM in safe mode. Healthy pipelines reported count is {}, " + - "required healthy pipeline reported count is {}", - currentHealthyPipelineCount, healthyPipelineThresholdCount); + + if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE && + pipeline.getPipelineState() == Pipeline.PipelineState.OPEN) { + // If the pipeline is open state mean, all 3 datanodes are reported + // for this pipeline. + currentHealthyPipelineCount++; + getSafeModeMetrics().incCurrentHealthyPipelinesCount(); } - processedDatanodeDetails.add(dnDetails); + processedPipelineIDs.add(pipelineID); } + if (scmInSafeMode()) { + SCMSafeModeManager.getLogger().info( + "SCM in safe mode. Healthy pipelines reported count is {}, " + + "required healthy pipeline reported count is {}", + currentHealthyPipelineCount, healthyPipelineThresholdCount); + } } @Override protected void cleanup() { - processedDatanodeDetails.clear(); + processedPipelineIDs.clear(); } @VisibleForTesting diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java index 7657b54..20cc3cf 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java @@ -64,6 +64,8 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT; + /** * Test DeadNodeHandler. */ @@ -84,6 +86,7 @@ public class TestDeadNodeHandler { storageDir = GenericTestUtils.getTempPath( TestDeadNodeHandler.class.getSimpleName() + UUID.randomUUID()); conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir); + conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 0); eventQueue = new EventQueue(); scm = HddsTestUtils.getScm(conf); nodeManager = (SCMNodeManager) scm.getScmNodeManager(); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java index 2e0d0b1..e200d6f 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java @@ -34,11 +34,14 @@ import org.junit.Test; import java.util.*; import java.util.stream.Collectors; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT; + /** * Test for PipelinePlacementPolicy. */ public class TestPipelinePlacementPolicy { private MockNodeManager nodeManager; + private OzoneConfiguration conf; private PipelinePlacementPolicy placementPolicy; private static final int PIPELINE_PLACEMENT_MAX_NODES_COUNT = 10; @@ -46,8 +49,10 @@ public class TestPipelinePlacementPolicy { public void init() throws Exception { nodeManager = new MockNodeManager(true, PIPELINE_PLACEMENT_MAX_NODES_COUNT); + conf = new OzoneConfiguration(); + conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 5); placementPolicy = - new PipelinePlacementPolicy(nodeManager, new OzoneConfiguration()); + new PipelinePlacementPolicy(nodeManager, conf); } @Test @@ -179,7 +184,9 @@ public class TestPipelinePlacementPolicy { } int considerHeavyCount = - ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT + 1; + conf.getInt( + ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, + ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT) + 1; Node2PipelineMap mockMap = new Node2PipelineMap(); for (DatanodeDetails node : nodes) { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java index c583559..9bccb1a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java @@ -169,7 +169,7 @@ public class TestPipelineClose { new PipelineActionHandler(pipelineManager, conf); pipelineActionHandler .onMessage(pipelineActionsFromDatanode, new EventQueue()); - Thread.sleep((int) (pipelineDestroyTimeoutInMillis * 1.2)); + Thread.sleep((int) (pipelineDestroyTimeoutInMillis * 10)); OzoneContainer ozoneContainer = cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() .getContainer(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java index 00144e4..0a8c5ad 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdds.scm.pipeline; -import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -33,6 +32,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT; + /** * Test for RatisPipelineProvider. */ @@ -46,14 +47,17 @@ public class TestRatisPipelineProvider { public void init() throws Exception { nodeManager = new MockNodeManager(true, 10); stateManager = new PipelineStateManager(new OzoneConfiguration()); + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 1); provider = new MockRatisPipelineProvider(nodeManager, - stateManager, new OzoneConfiguration()); + stateManager, conf); } private void createPipelineAndAssertions( HddsProtos.ReplicationFactor factor) throws IOException { Pipeline pipeline = provider.create(factor); stateManager.addPipeline(pipeline); + nodeManager.addPipeline(pipeline); Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS); Assert.assertEquals(pipeline.getFactor(), factor); Assert.assertEquals(pipeline.getPipelineState(), @@ -61,10 +65,7 @@ public class TestRatisPipelineProvider { Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber()); Pipeline pipeline1 = provider.create(factor); stateManager.addPipeline(pipeline1); - // New pipeline should not overlap with the previous created pipeline - Assert.assertTrue( - CollectionUtils.intersection(pipeline.getNodes(), pipeline1.getNodes()) - .isEmpty()); + nodeManager.addPipeline(pipeline1); Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS); Assert.assertEquals(pipeline1.getFactor(), factor); Assert.assertEquals(pipeline1.getPipelineState(), @@ -77,6 +78,7 @@ public class TestRatisPipelineProvider { HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE; Pipeline pipeline = provider.create(factor); stateManager.addPipeline(pipeline); + nodeManager.addPipeline(pipeline); Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS); Assert.assertEquals(pipeline.getFactor(), factor); Assert.assertEquals(pipeline.getPipelineState(), @@ -86,11 +88,7 @@ public class TestRatisPipelineProvider { factor = HddsProtos.ReplicationFactor.ONE; Pipeline pipeline1 = provider.create(factor); stateManager.addPipeline(pipeline1); - // New pipeline should overlap with the previous created pipeline, - // and one datanode should overlap between the two types. - Assert.assertEquals( - CollectionUtils.intersection(pipeline.getNodes(), - pipeline1.getNodes()).size(), 1); + nodeManager.addPipeline(pipeline1); Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS); Assert.assertEquals(pipeline1.getFactor(), factor); Assert.assertEquals(pipeline1.getPipelineState(), @@ -154,6 +152,10 @@ public class TestRatisPipelineProvider { .build(); stateManager.addPipeline(openPipeline); + nodeManager.addPipeline(openPipeline); + for (DatanodeDetails node : openPipeline.getNodes()) { + System.out.println("open pipeline contains " + node.getUuid()); + } // Use up next 3 DNs also for an open pipeline. List<DatanodeDetails> moreOpenPiplineDns = nodeManager.getAllNodes() @@ -166,6 +168,7 @@ public class TestRatisPipelineProvider { .setId(PipelineID.randomId()) .build(); stateManager.addPipeline(anotherOpenPipeline); + nodeManager.addPipeline(anotherOpenPipeline); // Use up next 3 DNs also for a closed pipeline. List<DatanodeDetails> closedPiplineDns = nodeManager.getAllNodes() @@ -178,6 +181,7 @@ public class TestRatisPipelineProvider { .setId(PipelineID.randomId()) .build(); stateManager.addPipeline(anotherClosedPipeline); + nodeManager.addPipeline(anotherClosedPipeline); Pipeline pipeline = provider.create(factor); Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS); @@ -187,12 +191,6 @@ public class TestRatisPipelineProvider { Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber()); List<DatanodeDetails> pipelineNodes = pipeline.getNodes(); - // Pipline nodes cannot be from open pipelines. - Assert.assertTrue( - pipelineNodes.parallelStream().filter(dn -> - (openPiplineDns.contains(dn) || moreOpenPiplineDns.contains(dn))) - .count() == 0); - // Since we have only 10 DNs, at least 1 pipeline node should have been // from the closed pipeline DN list. Assert.assertTrue(pipelineNodes.parallelStream().filter( diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java index 2a486b1..9d59960 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.pipeline; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT; import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; @@ -28,6 +29,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.MockNodeManager; @@ -58,6 +60,7 @@ public class TestSCMPipelineManager { @Before public void setUp() throws Exception { conf = new OzoneConfiguration(); + conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 1); testDir = GenericTestUtils .getTestDir(TestSCMPipelineManager.class.getSimpleName()); conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath()); @@ -253,10 +256,8 @@ public class TestSCMPipelineManager { pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); Assert.fail(); - } catch (InsufficientDatanodesException idEx) { - Assert.assertEquals( - "Cannot create pipeline of factor 3 using 1 nodes.", - idEx.getMessage()); + } catch (SCMException idEx) { + // pipeline creation failed this time. } metrics = getMetrics( @@ -266,7 +267,7 @@ public class TestSCMPipelineManager { numPipelineCreateFailed = getLongCounter( "NumPipelineCreationFailed", metrics); - Assert.assertTrue(numPipelineCreateFailed == 0); + Assert.assertTrue(numPipelineCreateFailed == 1); } @Test diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index 0aba968..19c1406 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -36,6 +36,8 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.TimeoutException; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT; + /** * Interface used for MiniOzoneClusters. */ @@ -269,6 +271,8 @@ public interface MiniOzoneCluster { protected Builder(OzoneConfiguration conf) { this.conf = conf; + // MiniOzoneCluster doesn't have pipeline engagement limit. + conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 0); setClusterId(UUID.randomUUID().toString()); } --------------------------------------------------------------------- To unsubscribe, e-mail: hdfs-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: hdfs-commits-h...@hadoop.apache.org