This is an automated email from the ASF dual-hosted git repository. sammichen pushed a commit to branch HDDS-1564 in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
commit 07e5348cdd9fac3997db7f1d729e456c1287c495 Author: Li Cheng <bloodhell2...@gmail.com> AuthorDate: Tue Oct 29 12:46:00 2019 +0800 HDDS-1569 Support creating multiple pipelines with same datanode. Contributed by Li Cheng. This closes #28 --- .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 10 +- .../common/src/main/resources/ozone-default.xml | 15 +- .../hadoop/hdds/scm/block/BlockManagerImpl.java | 4 + .../ContainerPlacementPolicyFactory.java | 8 +- .../hdds/scm/node/states/Node2PipelineMap.java | 2 +- .../scm/pipeline/BackgroundPipelineCreator.java | 1 + .../hdds/scm/pipeline/PipelinePlacementPolicy.java | 89 +++++++--- .../hadoop/hdds/scm/pipeline/PipelineStateMap.java | 5 +- .../hdds/scm/pipeline/RatisPipelineProvider.java | 136 +++++++++------ .../hdds/scm/pipeline/RatisPipelineUtils.java | 4 +- .../hdds/scm/pipeline/SCMPipelineManager.java | 13 +- .../hdds/scm/pipeline/SCMPipelineMetrics.java | 8 + .../scm/safemode/HealthyPipelineSafeModeRule.java | 65 ++++---- .../hadoop/hdds/scm/node/TestDeadNodeHandler.java | 3 + .../scm/pipeline/TestPipelinePlacementPolicy.java | 15 +- .../hdds/scm/pipeline/TestPipelineClose.java | 2 +- .../TestRatisPipelineCreateAndDestroy.java | 24 ++- .../scm/pipeline/TestRatisPipelineProvider.java | 184 +++++++++++++++++++++ .../hdds/scm/pipeline/TestSCMPipelineManager.java | 13 +- .../hadoop/hdds/scm/pipeline/TestSCMRestart.java | 5 +- .../safemode/TestSCMSafeModeWithPipelineRules.java | 3 + .../org/apache/hadoop/ozone/MiniOzoneCluster.java | 12 ++ .../apache/hadoop/ozone/MiniOzoneClusterImpl.java | 6 +- .../ozone/client/rpc/Test2WayCommitInRatis.java | 1 + .../ozone/client/rpc/TestBlockOutputStream.java | 1 + .../rpc/TestBlockOutputStreamWithFailures.java | 7 +- .../hadoop/ozone/client/rpc/TestCommitWatcher.java | 1 + .../rpc/TestContainerReplicationEndToEnd.java | 5 +- .../client/rpc/TestContainerStateMachine.java | 5 +- .../client/rpc/TestDeleteWithSlowFollower.java | 12 +- .../client/rpc/TestFailureHandlingByClient.java | 4 +- .../client/rpc/TestHybridPipelineOnDatanode.java | 3 +- .../ozone/client/rpc/TestKeyInputStream.java | 1 + .../rpc/TestMultiBlockWritesWithDnFailures.java | 8 +- .../rpc/TestOzoneClientRetriesOnException.java | 1 + .../client/rpc/TestOzoneRpcClientAbstract.java | 1 + .../ozone/client/rpc/TestWatchForCommit.java | 3 + .../TestCloseContainerByPipeline.java | 5 + .../TestSCMContainerPlacementPolicyMetrics.java | 1 + .../hadoop/ozone/scm/node/TestQueryNode.java | 3 + .../apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java | 3 + .../hadoop/ozone/freon/TestDataValidate.java | 2 +- .../ozone/freon/TestFreonWithPipelineDestroy.java | 1 + 43 files changed, 528 insertions(+), 167 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index d254197..5cb65c2 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -322,7 +322,15 @@ public final class ScmConfigKeys { // the max number of pipelines can a single datanode be engaged in. public static final String OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT = "ozone.scm.datanode.max.pipeline.engagement"; - public static final int OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT = 5; + // Setting to zero by default means this limit doesn't take effect. + public static final int OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT = 0; + + // Upper limit for how many pipelines can be created. + // Only for test purpose now. + public static final String OZONE_SCM_PIPELINE_NUMBER_LIMIT = + "ozone.scm.pipeline.number.limit"; + // Setting to zero by default means this limit doesn't take effect. + public static final int OZONE_SCM_PIPELINE_NUMBER_LIMIT_DEFAULT = 0; public static final String OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY = diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index c971bc0..c1887d2 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -832,10 +832,19 @@ </description> </property> <property> - <name>ozone.scm.datanode.max.pipeline.engagement</name> - <value>5</value> + <name>ozone.scm.datanode.max.pipeline.engagement</name> + <value>0</value> + <tag>OZONE, SCM, PIPELINE</tag> + <description>Max number of pipelines per datanode can be engaged in. + </description> + </property> + <property> + <name>ozone.scm.pipeline.number.limit</name> + <value>0</value> <tag>OZONE, SCM, PIPELINE</tag> - <description>Max number of pipelines per datanode can be engaged in. + <description>Upper limit for how many pipelines can be OPEN in SCM. + 0 as default means there is no limit. Otherwise, the number is the limit + of max amount of pipelines which are OPEN. </description> </property> <property> diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java index 845bdf1..00ad58a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java @@ -196,6 +196,10 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { // TODO: #CLUTIL Remove creation logic when all replication types and // factors are handled by pipeline creator pipeline = pipelineManager.createPipeline(type, factor); + } catch (SCMException se) { + LOG.warn("Pipeline creation failed for type:{} factor:{}. " + + "Datanodes may be used up.", type, factor, se); + break; } catch (IOException e) { LOG.warn("Pipeline creation failed for type:{} factor:{}. Retrying " + "get pipelines call once.", type, factor, e); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java index adaeb87..74431f9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java @@ -43,10 +43,10 @@ public final class ContainerPlacementPolicyFactory { } - public static PlacementPolicy getPolicy(Configuration conf, - final NodeManager nodeManager, NetworkTopology clusterMap, - final boolean fallback, SCMContainerPlacementMetrics metrics) - throws SCMException{ + public static PlacementPolicy getPolicy( + Configuration conf, final NodeManager nodeManager, + NetworkTopology clusterMap, final boolean fallback, + SCMContainerPlacementMetrics metrics) throws SCMException{ final Class<? extends PlacementPolicy> placementClass = conf .getClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY, OZONE_SCM_CONTAINER_PLACEMENT_IMPL_DEFAULT, diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java index 714188d..18809ed 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java @@ -80,7 +80,7 @@ public class Node2PipelineMap extends Node2ObjectsMap<PipelineID> { dn2ObjectMap.computeIfPresent(dnId, (k, v) -> { v.remove(pipeline.getId()); - return v; + return v.isEmpty() ? null : v; }); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java index 6873566..6952f74 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java @@ -96,6 +96,7 @@ class BackgroundPipelineCreator { if (scheduler.isClosed()) { break; } + pipelineManager.createPipeline(type, factor); } catch (IOException ioe) { break; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java index 1983ed6..23eb574 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java @@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; /** @@ -52,6 +53,7 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { static final Logger LOG = LoggerFactory.getLogger(PipelinePlacementPolicy.class); private final NodeManager nodeManager; + private final PipelineStateManager stateManager; private final Configuration conf; private final int heavyNodeCriteria; @@ -59,15 +61,17 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { * Constructs a pipeline placement with considering network topology, * load balancing and rack awareness. * - * @param nodeManager Node Manager + * @param nodeManager NodeManager + * @param stateManager PipelineStateManager * @param conf Configuration */ - public PipelinePlacementPolicy( - final NodeManager nodeManager, final Configuration conf) { + public PipelinePlacementPolicy(final NodeManager nodeManager, + final PipelineStateManager stateManager, final Configuration conf) { super(nodeManager, conf); this.nodeManager = nodeManager; this.conf = conf; - heavyNodeCriteria = conf.getInt( + this.stateManager = stateManager; + this.heavyNodeCriteria = conf.getInt( ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT); } @@ -76,11 +80,46 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { * Returns true if this node meets the criteria. * * @param datanodeDetails DatanodeDetails + * @param nodesRequired nodes required count * @return true if we have enough space. */ @VisibleForTesting - boolean meetCriteria(DatanodeDetails datanodeDetails, long heavyNodeLimit) { - return (nodeManager.getPipelinesCount(datanodeDetails) <= heavyNodeLimit); + boolean meetCriteria(DatanodeDetails datanodeDetails, int nodesRequired) { + if (heavyNodeCriteria == 0) { + // no limit applied. + return true; + } + // Datanodes from pipeline in some states can also be considered available + // for pipeline allocation. Thus the number of these pipeline shall be + // deducted from total heaviness calculation. + int pipelineNumDeductable = 0; + Set<PipelineID> pipelines = nodeManager.getPipelines(datanodeDetails); + for (PipelineID pid : pipelines) { + Pipeline pipeline; + try { + pipeline = stateManager.getPipeline(pid); + } catch (PipelineNotFoundException e) { + LOG.error("Pipeline not found in pipeline state manager during" + + " pipeline creation. PipelineID: " + pid + + " exception: " + e.getMessage()); + continue; + } + if (pipeline != null && + pipeline.getFactor().getNumber() == nodesRequired && + pipeline.getType() == HddsProtos.ReplicationType.RATIS && + pipeline.getPipelineState() == Pipeline.PipelineState.CLOSED) { + pipelineNumDeductable++; + } + } + boolean meet = (nodeManager.getPipelinesCount(datanodeDetails) + - pipelineNumDeductable) < heavyNodeCriteria; + if (!meet) { + LOG.info("Pipeline Placement: can't place more pipeline on heavy " + + "datanodeļ¼ " + datanodeDetails.getUuid().toString() + " Heaviness: " + + nodeManager.getPipelinesCount(datanodeDetails) + " limit: " + + heavyNodeCriteria); + } + return meet; } /** @@ -102,18 +141,19 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { if (excludedNodes != null) { healthyNodes.removeAll(excludedNodes); } + int initialHealthyNodesCount = healthyNodes.size(); String msg; - if (healthyNodes.size() == 0) { + if (initialHealthyNodesCount == 0) { msg = "No healthy node found to allocate pipeline."; LOG.error(msg); throw new SCMException(msg, SCMException.ResultCodes .FAILED_TO_FIND_HEALTHY_NODES); } - if (healthyNodes.size() < nodesRequired) { + if (initialHealthyNodesCount < nodesRequired) { msg = String.format("Not enough healthy nodes to allocate pipeline. %d " + " datanodes required. Found %d", - nodesRequired, healthyNodes.size()); + nodesRequired, initialHealthyNodesCount); LOG.error(msg); throw new SCMException(msg, SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); @@ -121,14 +161,17 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { // filter nodes that meet the size and pipeline engagement criteria. // Pipeline placement doesn't take node space left into account. - List<DatanodeDetails> healthyList = healthyNodes.stream().filter(d -> - meetCriteria(d, heavyNodeCriteria)).collect(Collectors.toList()); + List<DatanodeDetails> healthyList = healthyNodes.stream() + .filter(d -> meetCriteria(d, nodesRequired)).limit(nodesRequired) + .collect(Collectors.toList()); if (healthyList.size() < nodesRequired) { msg = String.format("Unable to find enough nodes that meet " + "the criteria that cannot engage in more than %d pipelines." + - " Nodes required: %d Found: %d", - heavyNodeCriteria, nodesRequired, healthyList.size()); + " Nodes required: %d Found: %d, healthy nodes count in " + + "NodeManager: %d.", + heavyNodeCriteria, nodesRequired, healthyList.size(), + initialHealthyNodesCount); LOG.error(msg); throw new SCMException(msg, SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); @@ -154,13 +197,11 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { // and make sure excludedNodes are excluded from list. List<DatanodeDetails> healthyNodes = filterViableNodes(excludedNodes, nodesRequired); - - // Randomly picks nodes when all nodes are equal. + + // Randomly picks nodes when all nodes are equal or factor is ONE. // This happens when network topology is absent or // all nodes are on the same rack. if (checkAllNodesAreEqual(nodeManager.getClusterNetworkTopologyMap())) { - LOG.info("All nodes are considered equal. Now randomly pick nodes. " + - "Required nodes: {}", nodesRequired); return super.getResultSet(nodesRequired, healthyNodes); } else { // Since topology and rack awareness are available, picks nodes @@ -188,8 +229,8 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { // First choose an anchor nodes randomly DatanodeDetails anchor = chooseNode(healthyNodes); if (anchor == null) { - LOG.error("Unable to find the first healthy nodes that " + - "meet the criteria. Required nodes: {}, Found nodes: {}", + LOG.error("Pipeline Placement: Unable to find the first healthy nodes " + + "that meet the criteria. Required nodes: {}, Found nodes: {}", nodesRequired, results.size()); throw new SCMException("Unable to find required number of nodes.", SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); @@ -204,8 +245,8 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { healthyNodes, exclude, nodeManager.getClusterNetworkTopologyMap(), anchor); if (nodeOnDifferentRack == null) { - LOG.error("Unable to find nodes on different racks that " + - "meet the criteria. Required nodes: {}, Found nodes: {}", + LOG.error("Pipeline Placement: Unable to find nodes on different racks " + + " that meet the criteria. Required nodes: {}, Found nodes: {}", nodesRequired, results.size()); throw new SCMException("Unable to find required number of nodes.", SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); @@ -228,9 +269,9 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { } if (results.size() < nodesRequired) { - LOG.error("Unable to find the required number of healthy nodes that " + - "meet the criteria. Required nodes: {}, Found nodes: {}", - nodesRequired, results.size()); + LOG.error("Pipeline Placement: Unable to find the required number of " + + "healthy nodes that meet the criteria. Required nodes: {}, " + + "Found nodes: {}", nodesRequired, results.size()); throw new SCMException("Unable to find required number of nodes.", SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java index 443378c..8e0f32d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java @@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -52,8 +53,8 @@ class PipelineStateMap { PipelineStateMap() { // TODO: Use TreeMap for range operations? - pipelineMap = new HashMap<>(); - pipeline2container = new HashMap<>(); + pipelineMap = new ConcurrentHashMap<>(); + pipeline2container = new ConcurrentHashMap<>(); query2OpenPipelines = new HashMap<>(); initializeQueryMap(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index 38db5e8..b402929 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -20,13 +20,12 @@ package org.apache.hadoop.hdds.scm.pipeline; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; -import org.apache.hadoop.hdds.scm.PlacementPolicy; -import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState; import org.apache.hadoop.io.MultipleIOException; @@ -44,8 +43,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -69,6 +66,9 @@ public class RatisPipelineProvider implements PipelineProvider { private final NodeManager nodeManager; private final PipelineStateManager stateManager; private final Configuration conf; + private final PipelinePlacementPolicy placementPolicy; + private int pipelineNumberLimit; + private int maxPipelinePerDatanode; // Set parallelism at 3, as now in Ratis we create 1 and 3 node pipelines. private final int parallelismForPool = 3; @@ -92,65 +92,93 @@ public class RatisPipelineProvider implements PipelineProvider { this.stateManager = stateManager; this.conf = conf; this.tlsConfig = tlsConfig; + this.placementPolicy = + new PipelinePlacementPolicy(nodeManager, stateManager, conf); + this.pipelineNumberLimit = conf.getInt( + ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT, + ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT_DEFAULT); + this.maxPipelinePerDatanode = conf.getInt( + ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, + ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT); } + private List<DatanodeDetails> pickNodesNeverUsed(ReplicationFactor factor) + throws SCMException { + Set<DatanodeDetails> dnsUsed = new HashSet<>(); + stateManager.getPipelines(ReplicationType.RATIS, factor) + .stream().filter( + p -> p.getPipelineState().equals(PipelineState.OPEN) || + p.getPipelineState().equals(PipelineState.DORMANT) || + p.getPipelineState().equals(PipelineState.ALLOCATED)) + .forEach(p -> dnsUsed.addAll(p.getNodes())); - /** - * Create pluggable container placement policy implementation instance. - * - * @param nodeManager - SCM node manager. - * @param conf - configuration. - * @return SCM container placement policy implementation instance. - */ - @SuppressWarnings("unchecked") - // TODO: should we rename PlacementPolicy to PipelinePlacementPolicy? - private static PlacementPolicy createContainerPlacementPolicy( - final NodeManager nodeManager, final Configuration conf) { - Class<? extends PlacementPolicy> implClass = - (Class<? extends PlacementPolicy>) conf.getClass( - ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY, - SCMContainerPlacementRandom.class); + // Get list of healthy nodes + List<DatanodeDetails> dns = nodeManager + .getNodes(HddsProtos.NodeState.HEALTHY) + .parallelStream() + .filter(dn -> !dnsUsed.contains(dn)) + .limit(factor.getNumber()) + .collect(Collectors.toList()); + if (dns.size() < factor.getNumber()) { + String e = String + .format("Cannot create pipeline of factor %d using %d nodes." + + " Used %d nodes. Healthy nodes %d", factor.getNumber(), + dns.size(), dnsUsed.size(), + nodeManager.getNodes(HddsProtos.NodeState.HEALTHY).size()); + throw new SCMException(e, + SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); + } + return dns; + } - try { - Constructor<? extends PlacementPolicy> ctor = - implClass.getDeclaredConstructor(NodeManager.class, - Configuration.class); - return ctor.newInstance(nodeManager, conf); - } catch (RuntimeException e) { - throw e; - } catch (InvocationTargetException e) { - throw new RuntimeException(implClass.getName() - + " could not be constructed.", e.getCause()); - } catch (Exception e) { -// LOG.error("Unhandled exception occurred, Placement policy will not " + -// "be functional."); - throw new IllegalArgumentException("Unable to load " + - "PlacementPolicy", e); + private boolean exceedPipelineNumberLimit(ReplicationFactor factor) { + if (factor != ReplicationFactor.THREE) { + // Only put limits for Factor THREE pipelines. + return false; + } + // Per datanode limit + if (maxPipelinePerDatanode > 0) { + return (stateManager.getPipelines(ReplicationType.RATIS, factor).size() - + stateManager.getPipelines(ReplicationType.RATIS, factor, + Pipeline.PipelineState.CLOSED).size()) > maxPipelinePerDatanode * + nodeManager.getNodeCount(HddsProtos.NodeState.HEALTHY) / + factor.getNumber(); } + + // Global limit + if (pipelineNumberLimit > 0) { + return (stateManager.getPipelines(ReplicationType.RATIS, + ReplicationFactor.THREE).size() - stateManager.getPipelines( + ReplicationType.RATIS, ReplicationFactor.THREE, + Pipeline.PipelineState.CLOSED).size()) > + (pipelineNumberLimit - stateManager.getPipelines( + ReplicationType.RATIS, ReplicationFactor.ONE).size()); + } + + return false; } @Override public Pipeline create(ReplicationFactor factor) throws IOException { - // Get set of datanodes already used for ratis pipeline - Set<DatanodeDetails> dnsUsed = new HashSet<>(); - stateManager.getPipelines(ReplicationType.RATIS, factor).stream().filter( - p -> p.getPipelineState().equals(PipelineState.OPEN) || - p.getPipelineState().equals(PipelineState.DORMANT) || - p.getPipelineState().equals(PipelineState.ALLOCATED)) - .forEach(p -> dnsUsed.addAll(p.getNodes())); + if (exceedPipelineNumberLimit(factor)) { + throw new SCMException("Ratis pipeline number meets the limit: " + + pipelineNumberLimit + " factor : " + + factor.getNumber(), + SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); + } - // Get list of healthy nodes - List<DatanodeDetails> dns = - nodeManager.getNodes(NodeState.HEALTHY) - .parallelStream() - .filter(dn -> !dnsUsed.contains(dn)) - .limit(factor.getNumber()) - .collect(Collectors.toList()); - if (dns.size() < factor.getNumber()) { - String e = String - .format("Cannot create pipeline of factor %d using %d nodes.", - factor.getNumber(), dns.size()); - throw new InsufficientDatanodesException(e); + List<DatanodeDetails> dns; + + switch(factor) { + case ONE: + dns = pickNodesNeverUsed(ReplicationFactor.ONE); + break; + case THREE: + dns = placementPolicy.chooseDatanodes(null, + null, factor.getNumber(), 0); + break; + default: + throw new IllegalStateException("Unknown factor: " + factor.name()); } Pipeline pipeline = Pipeline.newBuilder() diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java index 20fa092..31181a0 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java @@ -66,8 +66,8 @@ public final class RatisPipelineUtils { try { destroyPipeline(dn, pipeline.getId(), ozoneConf, grpcTlsConfig); } catch (IOException e) { - LOG.warn("Pipeline destroy failed for pipeline={} dn={}", - pipeline.getId(), dn); + LOG.warn("Pipeline destroy failed for pipeline={} dn={} exception={}", + pipeline.getId(), dn, e.getMessage()); } } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index 0964f6d..b41c595 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -54,10 +54,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import static org.apache.hadoop.hdds.scm - .ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT; -import static org.apache.hadoop.hdds.scm - .ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB; import static org.apache.hadoop.ozone.OzoneConsts.SCM_PIPELINE_DB; /** @@ -97,8 +93,8 @@ public class SCMPipelineManager implements PipelineManager { scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1); this.backgroundPipelineCreator = new BackgroundPipelineCreator(this, scheduler, conf); - int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB, - OZONE_SCM_DB_CACHE_SIZE_DEFAULT); + int cacheSize = conf.getInt(ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB, + ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT); final File metaDir = ServerUtils.getScmDbDir(conf); final File pipelineDBPath = new File(metaDir, SCM_PIPELINE_DB); this.pipelineStore = @@ -160,10 +156,9 @@ public class SCMPipelineManager implements PipelineManager { metrics.incNumPipelineCreated(); metrics.createPerPipelineMetrics(pipeline); return pipeline; - } catch (InsufficientDatanodesException idEx) { - throw idEx; } catch (IOException ex) { metrics.incNumPipelineCreationFailed(); + LOG.error("Pipeline creation failed.", ex); throw ex; } finally { lock.writeLock().unlock(); @@ -172,7 +167,7 @@ public class SCMPipelineManager implements PipelineManager { @Override public Pipeline createPipeline(ReplicationType type, ReplicationFactor factor, - List<DatanodeDetails> nodes) { + List<DatanodeDetails> nodes) { // This will mostly be used to create dummy pipeline for SimplePipelines. // We don't update the metrics for SimplePipelines. lock.writeLock().lock(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java index b6a1445..9427391 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java @@ -125,6 +125,14 @@ public final class SCMPipelineMetrics implements MetricsSource { } /** + * Get the number of pipeline created. + * @return number of pipeline + */ + long getNumPipelineCreated() { + return numPipelineCreated.value(); + } + + /** * Increments number of failed pipeline creation count. */ void incNumPipelineCreationFailed() { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java index 7a00d76..b3aac5e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hdds.scm.safemode; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.HddsConfigKeys; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; @@ -56,7 +55,7 @@ public class HealthyPipelineSafeModeRule private final PipelineManager pipelineManager; private final int healthyPipelineThresholdCount; private int currentHealthyPipelineCount = 0; - private final Set<DatanodeDetails> processedDatanodeDetails = + private final Set<PipelineID> processedPipelineIDs = new HashSet<>(); HealthyPipelineSafeModeRule(String ruleName, EventQueue eventQueue, @@ -116,46 +115,46 @@ public class HealthyPipelineSafeModeRule // processed report event, we should not consider this pipeline report // from datanode again during threshold calculation. Preconditions.checkNotNull(pipelineReportFromDatanode); - DatanodeDetails dnDetails = pipelineReportFromDatanode.getDatanodeDetails(); - if (!processedDatanodeDetails.contains( - pipelineReportFromDatanode.getDatanodeDetails())) { - - Pipeline pipeline; - PipelineReportsProto pipelineReport = - pipelineReportFromDatanode.getReport(); - - for (PipelineReport report : pipelineReport.getPipelineReportList()) { - PipelineID pipelineID = PipelineID - .getFromProtobuf(report.getPipelineID()); - try { - pipeline = pipelineManager.getPipeline(pipelineID); - } catch (PipelineNotFoundException e) { - continue; - } - - if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE && - pipeline.getPipelineState() == Pipeline.PipelineState.OPEN) { - // If the pipeline is open state mean, all 3 datanodes are reported - // for this pipeline. - currentHealthyPipelineCount++; - getSafeModeMetrics().incCurrentHealthyPipelinesCount(); - } + + Pipeline pipeline; + PipelineReportsProto pipelineReport = + pipelineReportFromDatanode.getReport(); + + for (PipelineReport report : pipelineReport.getPipelineReportList()) { + PipelineID pipelineID = PipelineID + .getFromProtobuf(report.getPipelineID()); + if (processedPipelineIDs.contains(pipelineID)) { + continue; + } + + try { + pipeline = pipelineManager.getPipeline(pipelineID); + } catch (PipelineNotFoundException e) { + continue; } - if (scmInSafeMode()) { - SCMSafeModeManager.getLogger().info( - "SCM in safe mode. Healthy pipelines reported count is {}, " + - "required healthy pipeline reported count is {}", - currentHealthyPipelineCount, healthyPipelineThresholdCount); + + if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE && + pipeline.getPipelineState() == Pipeline.PipelineState.OPEN) { + // If the pipeline is open state mean, all 3 datanodes are reported + // for this pipeline. + currentHealthyPipelineCount++; + getSafeModeMetrics().incCurrentHealthyPipelinesCount(); } - processedDatanodeDetails.add(dnDetails); + processedPipelineIDs.add(pipelineID); } + if (scmInSafeMode()) { + SCMSafeModeManager.getLogger().info( + "SCM in safe mode. Healthy pipelines reported count is {}, " + + "required healthy pipeline reported count is {}", + currentHealthyPipelineCount, healthyPipelineThresholdCount); + } } @Override protected void cleanup() { - processedDatanodeDetails.clear(); + processedPipelineIDs.clear(); } @VisibleForTesting diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java index 7657b54..20cc3cf 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java @@ -64,6 +64,8 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT; + /** * Test DeadNodeHandler. */ @@ -84,6 +86,7 @@ public class TestDeadNodeHandler { storageDir = GenericTestUtils.getTempPath( TestDeadNodeHandler.class.getSimpleName() + UUID.randomUUID()); conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir); + conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 0); eventQueue = new EventQueue(); scm = HddsTestUtils.getScm(conf); nodeManager = (SCMNodeManager) scm.getScmNodeManager(); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java index 2e0d0b1..1e34039 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java @@ -34,11 +34,14 @@ import org.junit.Test; import java.util.*; import java.util.stream.Collectors; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT; + /** * Test for PipelinePlacementPolicy. */ public class TestPipelinePlacementPolicy { private MockNodeManager nodeManager; + private OzoneConfiguration conf; private PipelinePlacementPolicy placementPolicy; private static final int PIPELINE_PLACEMENT_MAX_NODES_COUNT = 10; @@ -46,8 +49,10 @@ public class TestPipelinePlacementPolicy { public void init() throws Exception { nodeManager = new MockNodeManager(true, PIPELINE_PLACEMENT_MAX_NODES_COUNT); - placementPolicy = - new PipelinePlacementPolicy(nodeManager, new OzoneConfiguration()); + conf = new OzoneConfiguration(); + conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 5); + placementPolicy = new PipelinePlacementPolicy( + nodeManager, new PipelineStateManager(conf), conf); } @Test @@ -123,7 +128,7 @@ public class TestPipelinePlacementPolicy { public void testHeavyNodeShouldBeExcluded() throws SCMException{ List<DatanodeDetails> healthyNodes = nodeManager.getNodes(HddsProtos.NodeState.HEALTHY); - int nodesRequired = healthyNodes.size()/2; + int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber(); // only minority of healthy NODES are heavily engaged in pipelines. int minorityHeavy = healthyNodes.size()/2 - 1; List<DatanodeDetails> pickedNodes1 = placementPolicy.chooseDatanodes( @@ -179,7 +184,9 @@ public class TestPipelinePlacementPolicy { } int considerHeavyCount = - ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT + 1; + conf.getInt( + ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, + ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT) + 1; Node2PipelineMap mockMap = new Node2PipelineMap(); for (DatanodeDetails node : nodes) { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java index c583559..9bccb1a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java @@ -169,7 +169,7 @@ public class TestPipelineClose { new PipelineActionHandler(pipelineManager, conf); pipelineActionHandler .onMessage(pipelineActionsFromDatanode, new EventQueue()); - Thread.sleep((int) (pipelineDestroyTimeoutInMillis * 1.2)); + Thread.sleep((int) (pipelineDestroyTimeoutInMillis * 10)); OzoneContainer ozoneContainer = cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() .getContainer(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java index 6ace90c..cbe450e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.pipeline; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; @@ -34,6 +35,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; /** @@ -48,9 +50,12 @@ public class TestRatisPipelineCreateAndDestroy { public void init(int numDatanodes) throws Exception { conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, GenericTestUtils.getRandomizedTempPath()); + conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2); + cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(numDatanodes) - .setHbInterval(1000) + .setTotalPipelineNumLimit(numDatanodes + numDatanodes/3) + .setHbInterval(2000) .setHbProcessorInterval(1000) .build(); cluster.waitForClusterToBeReady(); @@ -103,7 +108,9 @@ public class TestRatisPipelineCreateAndDestroy { } catch (IOException ioe) { // As now all datanodes are shutdown, they move to stale state, there // will be no sufficient datanodes to create the pipeline. - Assert.assertTrue(ioe instanceof InsufficientDatanodesException); + Assert.assertTrue(ioe instanceof SCMException); + Assert.assertEquals(SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE, + ((SCMException) ioe).getResult()); } // make sure pipelines is destroyed @@ -116,9 +123,14 @@ public class TestRatisPipelineCreateAndDestroy { for (Pipeline pipeline : pipelines) { pipelineManager.finalizeAndDestroyPipeline(pipeline, false); } - // make sure pipelines is created after node start - pipelineManager.triggerPipelineCreation(); - waitForPipelines(1); + + if (cluster.getStorageContainerManager() + .getScmNodeManager().getNodeCount(HddsProtos.NodeState.HEALTHY) >= + HddsProtos.ReplicationFactor.THREE.getNumber()) { + // make sure pipelines is created after node start + pipelineManager.triggerPipelineCreation(); + waitForPipelines(1); + } } private void waitForPipelines(int numPipelines) @@ -126,6 +138,6 @@ public class TestRatisPipelineCreateAndDestroy { GenericTestUtils.waitFor(() -> pipelineManager .getPipelines(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN) - .size() == numPipelines, 100, 40000); + .size() >= numPipelines, 100, 40000); } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java new file mode 100644 index 0000000..7862605 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.container.MockNodeManager; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT; + +/** + * Test for RatisPipelineProvider. + */ +public class TestRatisPipelineProvider { + + private NodeManager nodeManager; + private PipelineProvider provider; + private PipelineStateManager stateManager; + + @Before + public void init() throws Exception { + nodeManager = new MockNodeManager(true, 10); + stateManager = new PipelineStateManager(new OzoneConfiguration()); + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 1); + provider = new MockRatisPipelineProvider(nodeManager, + stateManager, conf); + } + + private void createPipelineAndAssertions( + HddsProtos.ReplicationFactor factor) throws IOException { + Pipeline pipeline = provider.create(factor); + stateManager.addPipeline(pipeline); + nodeManager.addPipeline(pipeline); + Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS); + Assert.assertEquals(pipeline.getFactor(), factor); + Assert.assertEquals(pipeline.getPipelineState(), + Pipeline.PipelineState.OPEN); + Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber()); + Pipeline pipeline1 = provider.create(factor); + stateManager.addPipeline(pipeline1); + nodeManager.addPipeline(pipeline1); + Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS); + Assert.assertEquals(pipeline1.getFactor(), factor); + Assert.assertEquals(pipeline1.getPipelineState(), + Pipeline.PipelineState.OPEN); + Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber()); + } + + @Test + public void testCreatePipelineWithFactor() throws IOException { + HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE; + Pipeline pipeline = provider.create(factor); + stateManager.addPipeline(pipeline); + nodeManager.addPipeline(pipeline); + Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS); + Assert.assertEquals(pipeline.getFactor(), factor); + Assert.assertEquals(pipeline.getPipelineState(), + Pipeline.PipelineState.OPEN); + Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber()); + + factor = HddsProtos.ReplicationFactor.ONE; + Pipeline pipeline1 = provider.create(factor); + stateManager.addPipeline(pipeline1); + nodeManager.addPipeline(pipeline1); + Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS); + Assert.assertEquals(pipeline1.getFactor(), factor); + Assert.assertEquals(pipeline1.getPipelineState(), + Pipeline.PipelineState.OPEN); + Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber()); + } + + @Test + public void testCreatePipelineWithFactorThree() throws IOException { + createPipelineAndAssertions(HddsProtos.ReplicationFactor.THREE); + } + + @Test + public void testCreatePipelineWithFactorOne() throws IOException { + createPipelineAndAssertions(HddsProtos.ReplicationFactor.ONE); + } + + private List<DatanodeDetails> createListOfNodes(int nodeCount) { + List<DatanodeDetails> nodes = new ArrayList<>(); + for (int i = 0; i < nodeCount; i++) { + nodes.add(TestUtils.randomDatanodeDetails()); + } + return nodes; + } + + @Test + public void testCreatePipelineWithNodes() { + HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE; + Pipeline pipeline = + provider.create(factor, createListOfNodes(factor.getNumber())); + Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS); + Assert.assertEquals(pipeline.getFactor(), factor); + Assert.assertEquals( + pipeline.getPipelineState(), Pipeline.PipelineState.OPEN); + Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber()); + + factor = HddsProtos.ReplicationFactor.ONE; + pipeline = provider.create(factor, createListOfNodes(factor.getNumber())); + Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS); + Assert.assertEquals(pipeline.getFactor(), factor); + Assert.assertEquals(pipeline.getPipelineState(), + Pipeline.PipelineState.OPEN); + Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber()); + } + + @Test + public void testCreatePipelinesDnExclude() throws IOException { + List<DatanodeDetails> allHealthyNodes = + nodeManager.getNodes(HddsProtos.NodeState.HEALTHY); + int totalHealthyNodesCount = allHealthyNodes.size(); + + HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE; + + List<DatanodeDetails> closePipelineDns = new ArrayList<>(); + for (int i = 0; i < totalHealthyNodesCount/3; i++) { + List<DatanodeDetails> pipelineDns = allHealthyNodes + .subList(3 * i, 3 * (i + 1)); + + Pipeline.PipelineState state; + if (i % 2 == 0) { + state = Pipeline.PipelineState.OPEN; + } else { + state = Pipeline.PipelineState.CLOSED; + closePipelineDns.addAll(pipelineDns); + } + + Pipeline openPipeline = Pipeline.newBuilder() + .setType(HddsProtos.ReplicationType.RATIS) + .setFactor(factor) + .setNodes(pipelineDns) + .setState(state) + .setId(PipelineID.randomId()) + .build(); + + + stateManager.addPipeline(openPipeline); + nodeManager.addPipeline(openPipeline); + } + + Pipeline pipeline = provider.create(factor); + Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS); + Assert.assertEquals(pipeline.getFactor(), factor); + Assert.assertEquals(pipeline.getPipelineState(), + Pipeline.PipelineState.OPEN); + Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber()); + List<DatanodeDetails> pipelineNodes = pipeline.getNodes(); + + // Since we have only 10 DNs, at least 1 pipeline node should have been + // from the closed pipeline DN list. + Assert.assertTrue(pipelineNodes.parallelStream().filter( + closePipelineDns::contains).count() > 0); + } +} \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java index 2a486b1..ee93dd4 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.pipeline; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT; import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; @@ -28,6 +29,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.MockNodeManager; @@ -58,6 +60,7 @@ public class TestSCMPipelineManager { @Before public void setUp() throws Exception { conf = new OzoneConfiguration(); + conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 1); testDir = GenericTestUtils .getTestDir(TestSCMPipelineManager.class.getSimpleName()); conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath()); @@ -253,10 +256,10 @@ public class TestSCMPipelineManager { pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); Assert.fail(); - } catch (InsufficientDatanodesException idEx) { - Assert.assertEquals( - "Cannot create pipeline of factor 3 using 1 nodes.", - idEx.getMessage()); + } catch (SCMException ioe) { + // pipeline creation failed this time. + Assert.assertEquals(SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE, + ioe.getResult()); } metrics = getMetrics( @@ -266,7 +269,7 @@ public class TestSCMPipelineManager { numPipelineCreateFailed = getLongCounter( "NumPipelineCreationFailed", metrics); - Assert.assertTrue(numPipelineCreateFailed == 0); + Assert.assertTrue(numPipelineCreateFailed == 1); } @Test diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java index 459a67a..baeee6a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java @@ -57,8 +57,11 @@ public class TestSCMRestart { @BeforeClass public static void init() throws Exception { conf = new OzoneConfiguration(); + int numOfNodes = 4; cluster = MiniOzoneCluster.newBuilder(conf) - .setNumDatanodes(4) + .setNumDatanodes(numOfNodes) + // allow only one FACTOR THREE pipeline. + .setTotalPipelineNumLimit(numOfNodes + 1) .setHbInterval(1000) .setHbProcessorInterval(1000) .build(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java index 7cfd555..09c633d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java @@ -38,6 +38,7 @@ import org.junit.rules.TemporaryFolder; import java.util.List; import java.util.concurrent.TimeoutException; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT; import static org.junit.Assert.fail; /** @@ -62,6 +63,8 @@ public class TestSCMSafeModeWithPipelineRules { true); conf.set(HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, "10s"); conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL, "10s"); + conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 50); + clusterBuilder = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(numDatanodes) .setHbInterval(1000) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index 59cef37..5784196 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -238,6 +238,7 @@ public interface MiniOzoneCluster { protected static final int DEFAULT_HB_INTERVAL_MS = 1000; protected static final int DEFAULT_HB_PROCESSOR_INTERVAL_MS = 100; protected static final int ACTIVE_OMS_NOT_SET = -1; + protected static final int DEFAULT_PIPELIME_LIMIT = 3; protected final OzoneConfiguration conf; protected String path; @@ -265,6 +266,7 @@ public interface MiniOzoneCluster { protected int numOfDatanodes = 1; protected boolean startDataNodes = true; protected CertificateClient certClient; + protected int pipelineNumLimit = DEFAULT_PIPELIME_LIMIT; protected Builder(OzoneConfiguration conf) { this.conf = conf; @@ -352,6 +354,16 @@ public interface MiniOzoneCluster { } /** + * Sets the total number of pipelines to create. + * @param val number of pipelines + * @return MiniOzoneCluster.Builder + */ + public Builder setTotalPipelineNumLimit(int val) { + pipelineNumLimit = val; + return this; + } + + /** * Sets the number of HeartBeat Interval of Datanodes, the value should be * in MilliSeconds. * diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index 8b2fc92..8c4b74a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -493,6 +493,10 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster { streamBufferMaxSize.get(), streamBufferSizeUnit.get()); conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, blockSize.get(), streamBufferSizeUnit.get()); + // MiniOzoneCluster should have global pipeline upper limit. + conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT, + pipelineNumLimit == DEFAULT_PIPELIME_LIMIT ? + 2 * numOfDatanodes : pipelineNumLimit); configureTrace(); } @@ -504,7 +508,7 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster { * Creates a new StorageContainerManager instance. * * @return {@link StorageContainerManager} - * + *Wa * @throws IOException */ StorageContainerManager createSCM() diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java index fd2cea3..1eef382 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java @@ -81,6 +81,7 @@ public class Test2WayCommitInRatis { conf.setQuietMode(false); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(7) + .setTotalPipelineNumLimit(10) .setBlockSize(blockSize) .setChunkSize(chunkSize) .setStreamBufferFlushSize(flushSize) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java index 399b977..444f362 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java @@ -84,6 +84,7 @@ public class TestBlockOutputStream { StorageUnit.MB); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(7) + .setTotalPipelineNumLimit(10) .setBlockSize(blockSize) .setChunkSize(chunkSize) .setStreamBufferFlushSize(flushSize) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java index 8649837..3bcd81f 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientMetrics; import org.apache.hadoop.hdds.scm.XceiverClientRatis; @@ -89,9 +90,11 @@ public class TestBlockOutputStreamWithFailures { conf.setQuietMode(false); conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4, StorageUnit.MB); + conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 3); + cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(7) - .setBlockSize(blockSize).setChunkSize(chunkSize) - .setStreamBufferFlushSize(flushSize) + .setTotalPipelineNumLimit(10).setBlockSize(blockSize) + .setChunkSize(chunkSize).setStreamBufferFlushSize(flushSize) .setStreamBufferMaxSize(maxFlushSize) .setStreamBufferSizeUnit(StorageUnit.BYTES).build(); cluster.waitForClusterToBeReady(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java index 41ebb63..cd75f81 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java @@ -95,6 +95,7 @@ public class TestCommitWatcher { StorageUnit.MB); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(7) + .setTotalPipelineNumLimit(10) .setBlockSize(blockSize) .setChunkSize(chunkSize) .setStreamBufferFlushSize(flushSize) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java index 0886d26..a8f61f7 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java @@ -56,6 +56,7 @@ import java.util.function.Predicate; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT; /** * Tests delete key operation with a slow follower in the datanode @@ -99,10 +100,12 @@ public class TestContainerReplicationEndToEnd { 1000, TimeUnit.SECONDS); conf.setLong("hdds.scm.replication.thread.interval", containerReportInterval); + conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2); conf.setQuietMode(false); cluster = - MiniOzoneCluster.newBuilder(conf).setNumDatanodes(4).setHbInterval(200) + MiniOzoneCluster.newBuilder(conf).setNumDatanodes(4) + .setTotalPipelineNumLimit(6).setHbInterval(200) .build(); cluster.waitForClusterToBeReady(); cluster.getStorageContainerManager().getReplicationManager().start(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java index 82c4910..3b806dd 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java @@ -52,8 +52,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*; /** * Tests the containerStateMachine failure handling. @@ -82,7 +81,7 @@ public class TestContainerStateMachine { baseDir.mkdirs(); conf.setBoolean(HDDS_BLOCK_TOKEN_ENABLED, true); - // conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true); + // conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true); conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200, TimeUnit.MILLISECONDS); conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java index 30c2624..644469e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java @@ -80,6 +80,7 @@ public class TestDeleteWithSlowFollower { private static String bucketName; private static String path; private static XceiverClientManager xceiverClientManager; + private static final int FACTOR_THREE_PIPELINE_COUNT = 1; /** * Create a MiniDFSCluster for testing. @@ -111,10 +112,13 @@ public class TestDeleteWithSlowFollower { 1000, TimeUnit.SECONDS); conf.setTimeDuration(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 1, TimeUnit.SECONDS); - conf.setQuietMode(false); - cluster = - MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).setHbInterval(100) + int numOfDatanodes = 3; + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(numOfDatanodes) + .setTotalPipelineNumLimit( + numOfDatanodes + FACTOR_THREE_PIPELINE_COUNT) + .setHbInterval(100) .build(); cluster.waitForClusterToBeReady(); //the easiest way to create an open container is creating a key @@ -176,7 +180,7 @@ public class TestDeleteWithSlowFollower { cluster.getStorageContainerManager().getPipelineManager() .getPipelines(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); - Assert.assertTrue(pipelineList.size() == 1); + Assert.assertTrue(pipelineList.size() >= FACTOR_THREE_PIPELINE_COUNT); Pipeline pipeline = pipelineList.get(0); for (HddsDatanodeService dn : cluster.getHddsDatanodes()) { if (ContainerTestHelper.isRatisFollower(dn, pipeline)) { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java index edb796b..7b6d555 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; @@ -97,6 +98,7 @@ public class TestFailureHandlingByClient { 1, TimeUnit.SECONDS); conf.setBoolean( OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, true); + conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2); conf.setQuietMode(false); conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, @@ -105,7 +107,7 @@ public class TestFailureHandlingByClient { Collections.singleton(HddsUtils.getHostName(conf))).get(0), "/rack1"); cluster = MiniOzoneCluster.newBuilder(conf) - .setNumDatanodes(10).build(); + .setNumDatanodes(10).setTotalPipelineNumLimit(15).build(); cluster.waitForClusterToBeReady(); //the easiest way to create an open container is creating a key client = OzoneClientFactory.getClient(conf); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java index 47a716e..75af061 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java @@ -67,7 +67,8 @@ public class TestHybridPipelineOnDatanode { @BeforeClass public static void init() throws Exception { conf = new OzoneConfiguration(); - cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build(); + cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3) + .setTotalPipelineNumLimit(5).build(); cluster.waitForClusterToBeReady(); //the easiest way to create an open container is creating a key client = OzoneClientFactory.getClient(conf); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java index fa8a289..449ff3f 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java @@ -81,6 +81,7 @@ public class TestKeyInputStream { StorageUnit.MB); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(3) + .setTotalPipelineNumLimit(5) .setBlockSize(blockSize) .setChunkSize(chunkSize) .setStreamBufferFlushSize(flushSize) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java index 9666247..64047ba 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java @@ -47,8 +47,7 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*; /** * Tests MultiBlock Writes with Dn failures by Ozone Client. @@ -87,10 +86,13 @@ public class TestMultiBlockWritesWithDnFailures { conf.setTimeDuration( OzoneConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY, 1, TimeUnit.SECONDS); + conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2); conf.setQuietMode(false); cluster = MiniOzoneCluster.newBuilder(conf) - .setNumDatanodes(datanodes).build(); + .setNumDatanodes(datanodes) + .setTotalPipelineNumLimit(0) + .build(); cluster.waitForClusterToBeReady(); //the easiest way to create an open container is creating a key client = OzoneClientFactory.getClient(conf); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java index 5f6d494..0bc94d0 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java @@ -91,6 +91,7 @@ public class TestOzoneClientRetriesOnException { conf.setQuietMode(false); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(7) + .setTotalPipelineNumLimit(10) .setBlockSize(blockSize) .setChunkSize(chunkSize) .setStreamBufferFlushSize(flushSize) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java index 5ef143c..db0fa6b 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java @@ -165,6 +165,7 @@ public abstract class TestOzoneRpcClientAbstract { static void startCluster(OzoneConfiguration conf) throws Exception { cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(3) + .setTotalPipelineNumLimit(10) .setScmId(scmId) .build(); cluster.waitForClusterToBeReady(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java index a5d601e..9b7ed70 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java @@ -56,6 +56,7 @@ import java.util.concurrent.TimeoutException; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT; /** * This class verifies the watchForCommit Handling by xceiverClient. @@ -92,10 +93,12 @@ public class TestWatchForCommit { conf.setTimeDuration( OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY, 1, TimeUnit.SECONDS); + conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 5); conf.setQuietMode(false); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(7) + .setTotalPipelineNumLimit(10) .setBlockSize(blockSize) .setChunkSize(chunkSize) .setStreamBufferFlushSize(flushSize) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java index b676e1c..3cafb7d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java @@ -52,6 +52,8 @@ import java.util.HashMap; import java.util.List; import java.util.concurrent.TimeoutException; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT; + /** * Test container closing. */ @@ -73,8 +75,11 @@ public class TestCloseContainerByPipeline { public static void init() throws Exception { conf = new OzoneConfiguration(); conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, "1"); + conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2); + cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(10) + .setTotalPipelineNumLimit(15) .build(); cluster.waitForClusterToBeReady(); //the easiest way to create an open container is creating a key diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java index 536d807..e677544 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java @@ -82,6 +82,7 @@ public class TestSCMContainerPlacementPolicyMetrics { "/rack1"); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(4) + .setTotalPipelineNumLimit(10) .build(); cluster.waitForClusterToBeReady(); metrics = getMetrics(SCMContainerPlacementMetrics.class.getSimpleName()); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java index ecc2b3e..a882dcd 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java @@ -16,6 +16,7 @@ */ package org.apache.hadoop.ozone.scm.node; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -76,9 +77,11 @@ public class TestQueryNode { conf.setTimeDuration(HDDS_NODE_REPORT_INTERVAL, 1, SECONDS); conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS); conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS); + conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 3); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(numOfDatanodes) + .setTotalPipelineNumLimit(numOfDatanodes + numOfDatanodes/2) .build(); cluster.waitForClusterToBeReady(); scmClient = new ContainerOperationClient(conf); diff --git a/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java index fcabc67..b3bbe3b 100644 --- a/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java +++ b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java @@ -97,6 +97,7 @@ public class TestOzoneFsHAURLs { conf.setTimeDuration( OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY, LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS); + conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 3); OMStorage omStore = new OMStorage(conf); omStore.setClusterId(clusterId); @@ -106,6 +107,8 @@ public class TestOzoneFsHAURLs { // Start the cluster cluster = MiniOzoneCluster.newHABuilder(conf) + .setNumDatanodes(7) + .setTotalPipelineNumLimit(10) .setClusterId(clusterId) .setScmId(scmId) .setOMServiceId(omServiceId) diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java index fdcb822..3e1c826 100644 --- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java +++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java @@ -42,7 +42,7 @@ public abstract class TestDataValidate { static void startCluster(OzoneConfiguration conf) throws Exception { conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms"); cluster = MiniOzoneCluster.newBuilder(conf) - .setNumDatanodes(5).build(); + .setNumDatanodes(5).setTotalPipelineNumLimit(8).build(); cluster.waitForClusterToBeReady(); } diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java index 13ecab6..bd30d4e 100644 --- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java +++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java @@ -53,6 +53,7 @@ public class TestFreonWithPipelineDestroy { .setHbProcessorInterval(1000) .setHbInterval(1000) .setNumDatanodes(3) + .setTotalPipelineNumLimit(8) .build(); cluster.waitForClusterToBeReady(); } --------------------------------------------------------------------- To unsubscribe, e-mail: hdfs-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: hdfs-commits-h...@hadoop.apache.org