This is an automated email from the ASF dual-hosted git repository. elek pushed a commit to branch HDDS-1569 in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
commit 449731b2ef5612f584e1b8e932ab64020c8f5dbd Author: timmycheng <timmych...@tencent.com> AuthorDate: Fri Sep 27 20:10:42 2019 +0800 Handle ONE factor and THREE factor differently during pipeline allocaiton. Also add consideration for CLOSED pipeline to join allocation. --- .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 7 +++ .../common/src/main/resources/ozone-default.xml | 13 +++- .../hadoop/hdds/scm/block/BlockManagerImpl.java | 4 ++ .../scm/pipeline/BackgroundPipelineCreator.java | 1 + .../hdds/scm/pipeline/PipelinePlacementPolicy.java | 36 +++++++---- .../hdds/scm/pipeline/RatisPipelineProvider.java | 54 ++++++++++++++-- .../hdds/scm/pipeline/RatisPipelineUtils.java | 4 +- .../hdds/scm/pipeline/SCMPipelineManager.java | 44 ++++++++++--- .../hdds/scm/pipeline/SCMPipelineMetrics.java | 8 +++ .../scm/pipeline/TestPipelinePlacementPolicy.java | 6 +- .../TestRatisPipelineCreateAndDestroy.java | 19 ++++-- .../scm/pipeline/TestRatisPipelineProvider.java | 72 +++++++++------------- .../hadoop/hdds/scm/pipeline/TestSCMRestart.java | 5 +- .../safemode/TestSCMSafeModeWithPipelineRules.java | 4 ++ .../org/apache/hadoop/ozone/MiniOzoneCluster.java | 15 +++-- .../apache/hadoop/ozone/MiniOzoneClusterImpl.java | 3 + .../ozone/client/rpc/Test2WayCommitInRatis.java | 1 + .../rpc/TestBlockOutputStreamWithFailures.java | 5 +- .../hadoop/ozone/client/rpc/TestCommitWatcher.java | 1 + .../rpc/TestContainerReplicationEndToEnd.java | 7 ++- .../client/rpc/TestContainerStateMachine.java | 5 +- .../client/rpc/TestDeleteWithSlowFollower.java | 11 ++-- .../client/rpc/TestFailureHandlingByClient.java | 4 +- .../client/rpc/TestHybridPipelineOnDatanode.java | 3 +- .../ozone/client/rpc/TestKeyInputStream.java | 1 + .../rpc/TestMultiBlockWritesWithDnFailures.java | 8 ++- .../rpc/TestOzoneClientRetriesOnException.java | 1 + .../client/rpc/TestOzoneRpcClientAbstract.java | 1 + .../ozone/client/rpc/TestWatchForCommit.java | 5 +- .../TestCloseContainerByPipeline.java | 5 ++ .../TestSCMContainerPlacementPolicyMetrics.java | 1 + .../hadoop/ozone/scm/node/TestQueryNode.java | 3 + .../apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java | 3 + .../hadoop/ozone/freon/TestDataValidate.java | 2 +- .../ozone/freon/TestFreonWithPipelineDestroy.java | 1 + 35 files changed, 257 insertions(+), 106 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index fe51f51..05fb0a6 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -320,6 +320,13 @@ public final class ScmConfigKeys { // Setting to zero by default means this limit doesn't take effect. public static final int OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT = 0; + // Upper limit for how many pipelines can be created. + // Only for test purpose now. + public static final String OZONE_SCM_PIPELINE_NUMBER_LIMIT = + "ozone.scm.datanode.pipeline.number.limit"; + // Setting to zero by default means this limit doesn't take effect. + public static final int OZONE_SCM_PIPELINE_NUMBER_LIMIT_DEFAULT = 0; + public static final String OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY = "ozone.scm.keyvalue.container.deletion-choosing.policy"; diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index a038047..1fdd8c0 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -838,10 +838,17 @@ </description> </property> <property> - <name>ozone.scm.datanode.max.pipeline.engagement</name> - <value>5</value> + <name>ozone.scm.datanode.max.pipeline.engagement</name> + <value>0</value> + <tag>OZONE, SCM, PIPELINE</tag> + <description>Max number of pipelines per datanode can be engaged in. + </description> + </property> + <property> + <name>ozone.scm.datanode.pipeline.number.limit</name> + <value>0</value> <tag>OZONE, SCM, PIPELINE</tag> - <description>Max number of pipelines per datanode can be engaged in. + <description>Upper limit for how many pipelines can be created in SCM. </description> </property> <property> diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java index a0a7222..1230227 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java @@ -188,6 +188,10 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { // TODO: #CLUTIL Remove creation logic when all replication types and // factors are handled by pipeline creator pipeline = pipelineManager.createPipeline(type, factor); + } catch (SCMException se) { + LOG.warn("Pipeline creation failed for type:{} factor:{}. " + + "Datanodes may be used up.", type, factor, se); + break; } catch (IOException e) { LOG.warn("Pipeline creation failed for type:{} factor:{}. Retrying " + "get pipelines call once.", type, factor, e); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java index 6873566..6952f74 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java @@ -96,6 +96,7 @@ class BackgroundPipelineCreator { if (scheduler.isClosed()) { break; } + pipelineManager.createPipeline(type, factor); } catch (IOException ioe) { break; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java index df46fad..e41675d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java @@ -52,6 +52,7 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { static final Logger LOG = LoggerFactory.getLogger(PipelinePlacementPolicy.class); private final NodeManager nodeManager; + private final PipelineStateManager stateManager; private final Configuration conf; private final int heavyNodeCriteria; @@ -59,15 +60,17 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { * Constructs a pipeline placement with considering network topology, * load balancing and rack awareness. * - * @param nodeManager Node Manager + * @param nodeManager NodeManager + * @param stateManager PipelineStateManager * @param conf Configuration */ - public PipelinePlacementPolicy( - final NodeManager nodeManager, final Configuration conf) { + public PipelinePlacementPolicy(final NodeManager nodeManager, + final PipelineStateManager stateManager, final Configuration conf) { super(nodeManager, conf); this.nodeManager = nodeManager; this.conf = conf; - heavyNodeCriteria = conf.getInt( + this.stateManager = stateManager; + this.heavyNodeCriteria = conf.getInt( ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT); } @@ -76,16 +79,27 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { * Returns true if this node meets the criteria. * * @param datanodeDetails DatanodeDetails + * @param nodesRequired nodes required count * @return true if we have enough space. */ @VisibleForTesting - boolean meetCriteria(DatanodeDetails datanodeDetails) { + boolean meetCriteria(DatanodeDetails datanodeDetails, int nodesRequired) { if (heavyNodeCriteria == 0) { // no limit applied. return true; } + // Datanodes from pipeline in some states can also be considered available + // for pipeline allocation. Thus the number of these pipeline shall be + // deducted from total heaviness calculation. + int pipelineNumDeductable = (int)stateManager.getPipelines( + HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.valueOf(nodesRequired), + Pipeline.PipelineState.CLOSED) + .stream().filter( + p -> nodeManager.getPipelines(datanodeDetails).contains(p.getId())) + .count(); boolean meet = (nodeManager.getPipelinesCount(datanodeDetails) - < heavyNodeCriteria); + - pipelineNumDeductable) < heavyNodeCriteria; if (!meet) { LOG.info("Pipeline Placement: can't place more pipeline on heavy " + "datanodeļ¼ " + datanodeDetails.getUuid().toString() + " Heaviness: " + @@ -134,13 +148,14 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { // filter nodes that meet the size and pipeline engagement criteria. // Pipeline placement doesn't take node space left into account. - List<DatanodeDetails> healthyList = healthyNodes.stream().filter(d -> - meetCriteria(d)).collect(Collectors.toList()); + List<DatanodeDetails> healthyList = healthyNodes.stream() + .filter(d -> meetCriteria(d, nodesRequired)).limit(nodesRequired) + .collect(Collectors.toList()); if (healthyList.size() < nodesRequired) { msg = String.format("Unable to find enough nodes that meet " + "the criteria that cannot engage in more than %d pipelines." + - " Nodes required: %d Found: %d, healthy nodes count in" + + " Nodes required: %d Found: %d, healthy nodes count in " + "NodeManager: %d.", heavyNodeCriteria, nodesRequired, healthyList.size(), initialHealthyNodesCount); @@ -173,8 +188,7 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { // Randomly picks nodes when all nodes are equal or factor is ONE. // This happens when network topology is absent or // all nodes are on the same rack. - if (checkAllNodesAreEqual(nodeManager.getClusterNetworkTopologyMap()) - || nodesRequired == HddsProtos.ReplicationFactor.ONE.getNumber()) { + if (checkAllNodesAreEqual(nodeManager.getClusterNetworkTopologyMap())) { return super.getResultSet(nodesRequired, healthyNodes); } else { // Since topology and rack awareness are available, picks nodes diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index 6a51957..f6b80ed 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -20,10 +20,12 @@ package org.apache.hadoop.hdds.scm.pipeline; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState; import org.apache.hadoop.io.MultipleIOException; @@ -41,14 +43,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinWorkerThread; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * Implements Api for creating ratis pipelines. @@ -85,13 +86,54 @@ public class RatisPipelineProvider implements PipelineProvider { this.stateManager = stateManager; this.conf = conf; this.tlsConfig = tlsConfig; - this.placementPolicy = new PipelinePlacementPolicy(nodeManager, conf); + this.placementPolicy = + new PipelinePlacementPolicy(nodeManager, stateManager, conf); + } + + private List<DatanodeDetails> pickNodesNeverUsed(ReplicationFactor factor) + throws SCMException { + Set<DatanodeDetails> dnsUsed = new HashSet<>(); + stateManager.getPipelines(ReplicationType.RATIS, factor) + .stream().filter( + p -> p.getPipelineState().equals(PipelineState.OPEN) || + p.getPipelineState().equals(PipelineState.DORMANT) || + p.getPipelineState().equals(PipelineState.ALLOCATED)) + .forEach(p -> dnsUsed.addAll(p.getNodes())); + + // Get list of healthy nodes + List<DatanodeDetails> dns = nodeManager + .getNodes(HddsProtos.NodeState.HEALTHY) + .parallelStream() + .filter(dn -> !dnsUsed.contains(dn)) + .limit(factor.getNumber()) + .collect(Collectors.toList()); + if (dns.size() < factor.getNumber()) { + String e = String + .format("Cannot create pipeline of factor %d using %d nodes." + + " Used %d nodes. Healthy nodes %d", factor.getNumber(), + dns.size(), dnsUsed.size(), + nodeManager.getNodes(HddsProtos.NodeState.HEALTHY).size()); + throw new SCMException(e, + SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); + } + return dns; } @Override public Pipeline create(ReplicationFactor factor) throws IOException { - List<DatanodeDetails> dns = placementPolicy.chooseDatanodes(null, - null, factor.getNumber(), 0); + List<DatanodeDetails> dns; + + switch(factor) { + case ONE: + dns = pickNodesNeverUsed(ReplicationFactor.ONE); + break; + case THREE: + dns = placementPolicy.chooseDatanodes(null, + null, factor.getNumber(), 0); + break; + default: + throw new IllegalStateException("Unknown factor: " + factor.name()); + } Pipeline pipeline = Pipeline.newBuilder() .setId(PipelineID.randomId()) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java index 777a0b0..21c4fbf 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java @@ -64,8 +64,8 @@ public final class RatisPipelineUtils { try { destroyPipeline(dn, pipeline.getId(), ozoneConf, grpcTlsConfig); } catch (IOException e) { - LOG.warn("Pipeline destroy failed for pipeline={} dn={}", - pipeline.getId(), dn); + LOG.warn("Pipeline destroy failed for pipeline={} dn={} exception={}", + pipeline.getId(), dn, e.getMessage()); } } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index a927d56..80c934f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.server.ServerUtils; import org.apache.hadoop.hdds.server.events.EventPublisher; @@ -54,10 +55,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import static org.apache.hadoop.hdds.scm - .ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT; -import static org.apache.hadoop.hdds.scm - .ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB; import static org.apache.hadoop.ozone.OzoneConsts.SCM_PIPELINE_DB; /** @@ -84,6 +81,8 @@ public class SCMPipelineManager implements PipelineManager { // Pipeline Manager MXBean private ObjectName pmInfoBean; private GrpcTlsConfig grpcTlsConfig; + private int pipelineNumberLimit; + private int heavyNodeCriteria; public SCMPipelineManager(Configuration conf, NodeManager nodeManager, EventPublisher eventPublisher, GrpcTlsConfig grpcTlsConfig) @@ -97,8 +96,8 @@ public class SCMPipelineManager implements PipelineManager { scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1); this.backgroundPipelineCreator = new BackgroundPipelineCreator(this, scheduler, conf); - int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB, - OZONE_SCM_DB_CACHE_SIZE_DEFAULT); + int cacheSize = conf.getInt(ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB, + ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT); final File metaDir = ServerUtils.getScmDbDir(conf); final File pipelineDBPath = new File(metaDir, SCM_PIPELINE_DB); this.pipelineStore = @@ -115,6 +114,12 @@ public class SCMPipelineManager implements PipelineManager { "SCMPipelineManagerInfo", this); initializePipelineState(); this.grpcTlsConfig = grpcTlsConfig; + this.pipelineNumberLimit = conf.getInt( + ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT, + ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT_DEFAULT); + this.heavyNodeCriteria = conf.getInt( + ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, + ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT); } public PipelineStateManager getStateManager() { @@ -147,10 +152,33 @@ public class SCMPipelineManager implements PipelineManager { } } + private boolean exceedPipelineNumberLimit(ReplicationFactor factor) { + if (heavyNodeCriteria > 0 && factor == ReplicationFactor.THREE) { + return (stateManager.getPipelines(ReplicationType.RATIS, factor).size() - + stateManager.getPipelines(ReplicationType.RATIS, factor, + Pipeline.PipelineState.CLOSED).size()) >= heavyNodeCriteria * + nodeManager.getNodeCount(HddsProtos.NodeState.HEALTHY); + } + + if (pipelineNumberLimit > 0) { + return (stateManager.getPipelines(ReplicationType.RATIS).size() - + stateManager.getPipelines(ReplicationType.RATIS, + Pipeline.PipelineState.CLOSED).size()) >= pipelineNumberLimit; + } + + return false; + } + @Override public synchronized Pipeline createPipeline( ReplicationType type, ReplicationFactor factor) throws IOException { lock.writeLock().lock(); + if (type == ReplicationType.RATIS && exceedPipelineNumberLimit(factor)) { + lock.writeLock().unlock(); + throw new SCMException("Pipeline number meets the limit: " + + pipelineNumberLimit, + SCMException.ResultCodes.FAILED_TO_FIND_HEALTHY_NODES); + } try { Pipeline pipeline = pipelineFactory.create(type, factor); pipelineStore.put(pipeline.getId().getProtobuf().toByteArray(), @@ -160,8 +188,6 @@ public class SCMPipelineManager implements PipelineManager { metrics.incNumPipelineCreated(); metrics.createPerPipelineMetrics(pipeline); return pipeline; - } catch (InsufficientDatanodesException idEx) { - throw idEx; } catch (IOException ex) { metrics.incNumPipelineCreationFailed(); LOG.error("Pipeline creation failed.", ex); @@ -173,7 +199,7 @@ public class SCMPipelineManager implements PipelineManager { @Override public Pipeline createPipeline(ReplicationType type, ReplicationFactor factor, - List<DatanodeDetails> nodes) { + List<DatanodeDetails> nodes) { // This will mostly be used to create dummy pipeline for SimplePipelines. // We don't update the metrics for SimplePipelines. lock.writeLock().lock(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java index d0f7f6e..1b23036 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java @@ -124,6 +124,14 @@ public final class SCMPipelineMetrics implements MetricsSource { } /** + * Get the number of pipeline created. + * @return number of pipeline + */ + long getNumPipelineCreated() { + return numPipelineCreated.value(); + } + + /** * Increments number of failed pipeline creation count. */ void incNumPipelineCreationFailed() { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java index e200d6f..1e34039 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java @@ -51,8 +51,8 @@ public class TestPipelinePlacementPolicy { PIPELINE_PLACEMENT_MAX_NODES_COUNT); conf = new OzoneConfiguration(); conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 5); - placementPolicy = - new PipelinePlacementPolicy(nodeManager, conf); + placementPolicy = new PipelinePlacementPolicy( + nodeManager, new PipelineStateManager(conf), conf); } @Test @@ -128,7 +128,7 @@ public class TestPipelinePlacementPolicy { public void testHeavyNodeShouldBeExcluded() throws SCMException{ List<DatanodeDetails> healthyNodes = nodeManager.getNodes(HddsProtos.NodeState.HEALTHY); - int nodesRequired = healthyNodes.size()/2; + int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber(); // only minority of healthy NODES are heavily engaged in pipelines. int minorityHeavy = healthyNodes.size()/2 - 1; List<DatanodeDetails> pickedNodes1 = placementPolicy.chooseDatanodes( diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java index 6ace90c..d0afbbe 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.pipeline; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; @@ -34,6 +35,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; /** @@ -48,9 +50,12 @@ public class TestRatisPipelineCreateAndDestroy { public void init(int numDatanodes) throws Exception { conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, GenericTestUtils.getRandomizedTempPath()); + conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2); + cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(numDatanodes) - .setHbInterval(1000) + .setPipelineNumber(numDatanodes + numDatanodes/3) + .setHbInterval(2000) .setHbProcessorInterval(1000) .build(); cluster.waitForClusterToBeReady(); @@ -103,7 +108,7 @@ public class TestRatisPipelineCreateAndDestroy { } catch (IOException ioe) { // As now all datanodes are shutdown, they move to stale state, there // will be no sufficient datanodes to create the pipeline. - Assert.assertTrue(ioe instanceof InsufficientDatanodesException); + Assert.assertTrue(ioe instanceof SCMException); } // make sure pipelines is destroyed @@ -116,9 +121,13 @@ public class TestRatisPipelineCreateAndDestroy { for (Pipeline pipeline : pipelines) { pipelineManager.finalizeAndDestroyPipeline(pipeline, false); } - // make sure pipelines is created after node start - pipelineManager.triggerPipelineCreation(); - waitForPipelines(1); + + if (cluster.getStorageContainerManager() + .getScmNodeManager().getNodeCount(HddsProtos.NodeState.HEALTHY) > 0) { + // make sure pipelines is created after node start + pipelineManager.triggerPipelineCreation(); + waitForPipelines(1); + } } private void waitForPipelines(int numPipelines) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java index 0a8c5ad..7526575 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java @@ -137,52 +137,38 @@ public class TestRatisPipelineProvider { @Test public void testCreatePipelinesDnExclude() throws IOException { - // We have 10 DNs in MockNodeManager. - // Use up first 3 DNs for an open pipeline. - List<DatanodeDetails> openPiplineDns = nodeManager.getAllNodes() - .subList(0, 3); + List<DatanodeDetails> allHealthyNodes = + nodeManager.getNodes(HddsProtos.NodeState.HEALTHY); + int totalHealthyNodesCount = allHealthyNodes.size(); + HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE; - Pipeline openPipeline = Pipeline.newBuilder() - .setType(HddsProtos.ReplicationType.RATIS) - .setFactor(factor) - .setNodes(openPiplineDns) - .setState(Pipeline.PipelineState.OPEN) - .setId(PipelineID.randomId()) - .build(); - - stateManager.addPipeline(openPipeline); - nodeManager.addPipeline(openPipeline); - for (DatanodeDetails node : openPipeline.getNodes()) { - System.out.println("open pipeline contains " + node.getUuid()); + List<DatanodeDetails> closePipelineDns = new ArrayList<>(); + for (int i = 0; i < totalHealthyNodesCount/3; i++) { + List<DatanodeDetails> pipelineDns = allHealthyNodes + .subList(3 * i, 3 * (i + 1)); + + Pipeline.PipelineState state; + if (i % 2 == 0) { + state = Pipeline.PipelineState.OPEN; + } else { + state = Pipeline.PipelineState.CLOSED; + closePipelineDns.addAll(pipelineDns); + } + + Pipeline openPipeline = Pipeline.newBuilder() + .setType(HddsProtos.ReplicationType.RATIS) + .setFactor(factor) + .setNodes(pipelineDns) + .setState(state) + .setId(PipelineID.randomId()) + .build(); + + + stateManager.addPipeline(openPipeline); + nodeManager.addPipeline(openPipeline); } - // Use up next 3 DNs also for an open pipeline. - List<DatanodeDetails> moreOpenPiplineDns = nodeManager.getAllNodes() - .subList(3, 6); - Pipeline anotherOpenPipeline = Pipeline.newBuilder() - .setType(HddsProtos.ReplicationType.RATIS) - .setFactor(factor) - .setNodes(moreOpenPiplineDns) - .setState(Pipeline.PipelineState.OPEN) - .setId(PipelineID.randomId()) - .build(); - stateManager.addPipeline(anotherOpenPipeline); - nodeManager.addPipeline(anotherOpenPipeline); - - // Use up next 3 DNs also for a closed pipeline. - List<DatanodeDetails> closedPiplineDns = nodeManager.getAllNodes() - .subList(6, 9); - Pipeline anotherClosedPipeline = Pipeline.newBuilder() - .setType(HddsProtos.ReplicationType.RATIS) - .setFactor(factor) - .setNodes(closedPiplineDns) - .setState(Pipeline.PipelineState.CLOSED) - .setId(PipelineID.randomId()) - .build(); - stateManager.addPipeline(anotherClosedPipeline); - nodeManager.addPipeline(anotherClosedPipeline); - Pipeline pipeline = provider.create(factor); Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS); Assert.assertEquals(pipeline.getFactor(), factor); @@ -194,6 +180,6 @@ public class TestRatisPipelineProvider { // Since we have only 10 DNs, at least 1 pipeline node should have been // from the closed pipeline DN list. Assert.assertTrue(pipelineNodes.parallelStream().filter( - closedPiplineDns::contains).count() > 0); + closePipelineDns::contains).count() > 0); } } \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java index 459a67a..1af2f74 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java @@ -57,8 +57,11 @@ public class TestSCMRestart { @BeforeClass public static void init() throws Exception { conf = new OzoneConfiguration(); + int numOfNodes = 4; cluster = MiniOzoneCluster.newBuilder(conf) - .setNumDatanodes(4) + .setNumDatanodes(numOfNodes) + // allow only one FACTOR THREE pipeline. + .setPipelineNumber(numOfNodes + 1) .setHbInterval(1000) .setHbProcessorInterval(1000) .build(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java index 7cfd555..1caa302 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java @@ -38,6 +38,7 @@ import org.junit.rules.TemporaryFolder; import java.util.List; import java.util.concurrent.TimeoutException; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT; import static org.junit.Assert.fail; /** @@ -62,8 +63,11 @@ public class TestSCMSafeModeWithPipelineRules { true); conf.set(HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, "10s"); conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL, "10s"); + conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 1000); + clusterBuilder = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(numDatanodes) + .setPipelineNumber(numDatanodes + numDatanodes/3) .setHbInterval(1000) .setHbProcessorInterval(1000); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index 19c1406..4fe1701 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -36,8 +36,6 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.TimeoutException; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT; - /** * Interface used for MiniOzoneClusters. */ @@ -268,11 +266,10 @@ public interface MiniOzoneCluster { protected int numOfDatanodes = 1; protected boolean startDataNodes = true; protected CertificateClient certClient; + protected int pipelineNumber = 3; protected Builder(OzoneConfiguration conf) { this.conf = conf; - // MiniOzoneCluster doesn't have pipeline engagement limit. - conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 0); setClusterId(UUID.randomUUID().toString()); } @@ -357,6 +354,16 @@ public interface MiniOzoneCluster { } /** + * Sets the total number of pipelines to create. + * @param val number of pipelines + * @return MiniOzoneCluster.Builder + */ + public Builder setPipelineNumber(int val) { + pipelineNumber = val; + return this; + } + + /** * Sets the number of HeartBeat Interval of Datanodes, the value should be * in MilliSeconds. * diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index ac76482..39b2582 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -494,6 +494,9 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster { streamBufferMaxSize.get(), streamBufferSizeUnit.get()); conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, blockSize.get(), streamBufferSizeUnit.get()); + // MiniOzoneCluster should have global pipeline upper limit. + conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT, + pipelineNumber == 3 ? 2 * numOfDatanodes : pipelineNumber); configureTrace(); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java index cf570d2..ea648c9 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java @@ -81,6 +81,7 @@ public class Test2WayCommitInRatis { conf.setQuietMode(false); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(7) + .setPipelineNumber(10) .setBlockSize(blockSize) .setChunkSize(chunkSize) .setStreamBufferFlushSize(flushSize) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java index 8649837..9088497 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientMetrics; import org.apache.hadoop.hdds.scm.XceiverClientRatis; @@ -89,8 +90,10 @@ public class TestBlockOutputStreamWithFailures { conf.setQuietMode(false); conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4, StorageUnit.MB); + conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 3); + cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(7) - .setBlockSize(blockSize).setChunkSize(chunkSize) + .setPipelineNumber(10).setBlockSize(blockSize).setChunkSize(chunkSize) .setStreamBufferFlushSize(flushSize) .setStreamBufferMaxSize(maxFlushSize) .setStreamBufferSizeUnit(StorageUnit.BYTES).build(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java index ea51900..ff9fad4 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java @@ -96,6 +96,7 @@ public class TestCommitWatcher { StorageUnit.MB); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(7) + .setPipelineNumber(10) .setBlockSize(blockSize) .setChunkSize(chunkSize) .setStreamBufferFlushSize(flushSize) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java index 0886d26..865e0b5 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java @@ -54,8 +54,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*; /** * Tests delete key operation with a slow follower in the datanode @@ -99,10 +98,12 @@ public class TestContainerReplicationEndToEnd { 1000, TimeUnit.SECONDS); conf.setLong("hdds.scm.replication.thread.interval", containerReportInterval); + conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2); conf.setQuietMode(false); cluster = - MiniOzoneCluster.newBuilder(conf).setNumDatanodes(4).setHbInterval(200) + MiniOzoneCluster.newBuilder(conf).setNumDatanodes(4) + .setPipelineNumber(6).setHbInterval(200) .build(); cluster.waitForClusterToBeReady(); cluster.getStorageContainerManager().getReplicationManager().start(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java index 19a1707..37b8a5f 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java @@ -52,8 +52,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*; /** * Tests the containerStateMachine failure handling. @@ -82,7 +81,7 @@ public class TestContainerStateMachine { baseDir.mkdirs(); conf.setBoolean(HDDS_BLOCK_TOKEN_ENABLED, true); - // conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true); + // conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true); conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200, TimeUnit.MILLISECONDS); conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java index 30c2624..00556a8 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java @@ -80,6 +80,7 @@ public class TestDeleteWithSlowFollower { private static String bucketName; private static String path; private static XceiverClientManager xceiverClientManager; + private static final int FACTOR_THREE_PIPELINE_COUNT = 1; /** * Create a MiniDFSCluster for testing. @@ -111,10 +112,12 @@ public class TestDeleteWithSlowFollower { 1000, TimeUnit.SECONDS); conf.setTimeDuration(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 1, TimeUnit.SECONDS); - conf.setQuietMode(false); - cluster = - MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).setHbInterval(100) + int numOfDatanodes = 3; + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(numOfDatanodes) + .setPipelineNumber(numOfDatanodes + FACTOR_THREE_PIPELINE_COUNT) + .setHbInterval(100) .build(); cluster.waitForClusterToBeReady(); //the easiest way to create an open container is creating a key @@ -176,7 +179,7 @@ public class TestDeleteWithSlowFollower { cluster.getStorageContainerManager().getPipelineManager() .getPipelines(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); - Assert.assertTrue(pipelineList.size() == 1); + Assert.assertEquals(FACTOR_THREE_PIPELINE_COUNT, pipelineList.size()); Pipeline pipeline = pipelineList.get(0); for (HddsDatanodeService dn : cluster.getHddsDatanodes()) { if (ContainerTestHelper.isRatisFollower(dn, pipeline)) { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java index edb796b..0368323 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; @@ -97,6 +98,7 @@ public class TestFailureHandlingByClient { 1, TimeUnit.SECONDS); conf.setBoolean( OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, true); + conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2); conf.setQuietMode(false); conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, @@ -105,7 +107,7 @@ public class TestFailureHandlingByClient { Collections.singleton(HddsUtils.getHostName(conf))).get(0), "/rack1"); cluster = MiniOzoneCluster.newBuilder(conf) - .setNumDatanodes(10).build(); + .setNumDatanodes(10).setPipelineNumber(15).build(); cluster.waitForClusterToBeReady(); //the easiest way to create an open container is creating a key client = OzoneClientFactory.getClient(conf); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java index 47a716e..84649e3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java @@ -67,7 +67,8 @@ public class TestHybridPipelineOnDatanode { @BeforeClass public static void init() throws Exception { conf = new OzoneConfiguration(); - cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build(); + cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3) + .setPipelineNumber(5).build(); cluster.waitForClusterToBeReady(); //the easiest way to create an open container is creating a key client = OzoneClientFactory.getClient(conf); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java index fa8a289..666264c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java @@ -81,6 +81,7 @@ public class TestKeyInputStream { StorageUnit.MB); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(3) + .setPipelineNumber(5) .setBlockSize(blockSize) .setChunkSize(chunkSize) .setStreamBufferFlushSize(flushSize) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java index 9666247..6dbae6a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java @@ -47,8 +47,7 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*; /** * Tests MultiBlock Writes with Dn failures by Ozone Client. @@ -87,10 +86,13 @@ public class TestMultiBlockWritesWithDnFailures { conf.setTimeDuration( OzoneConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY, 1, TimeUnit.SECONDS); + conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2); conf.setQuietMode(false); cluster = MiniOzoneCluster.newBuilder(conf) - .setNumDatanodes(datanodes).build(); + .setNumDatanodes(datanodes) + .setPipelineNumber(0) + .build(); cluster.waitForClusterToBeReady(); //the easiest way to create an open container is creating a key client = OzoneClientFactory.getClient(conf); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java index 5f6d494..9e7e3c0 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java @@ -91,6 +91,7 @@ public class TestOzoneClientRetriesOnException { conf.setQuietMode(false); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(7) + .setPipelineNumber(10) .setBlockSize(blockSize) .setChunkSize(chunkSize) .setStreamBufferFlushSize(flushSize) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java index d91f739..4710ada 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java @@ -164,6 +164,7 @@ public abstract class TestOzoneRpcClientAbstract { static void startCluster(OzoneConfiguration conf) throws Exception { cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(3) + .setPipelineNumber(10) .setScmId(scmId) .build(); cluster.waitForClusterToBeReady(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java index 9b59349..fa89f5b 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java @@ -53,8 +53,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*; /** * This class verifies the watchForCommit Handling by xceiverClient. @@ -92,10 +91,12 @@ public class TestWatchForCommit { conf.setTimeDuration( OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY, 1, TimeUnit.SECONDS); + conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 5); conf.setQuietMode(false); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(7) + .setPipelineNumber(10) .setBlockSize(blockSize) .setChunkSize(chunkSize) .setStreamBufferFlushSize(flushSize) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java index b676e1c..763f639 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java @@ -52,6 +52,8 @@ import java.util.HashMap; import java.util.List; import java.util.concurrent.TimeoutException; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT; + /** * Test container closing. */ @@ -73,8 +75,11 @@ public class TestCloseContainerByPipeline { public static void init() throws Exception { conf = new OzoneConfiguration(); conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, "1"); + conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2); + cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(10) + .setPipelineNumber(15) .build(); cluster.waitForClusterToBeReady(); //the easiest way to create an open container is creating a key diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java index 536d807..191589a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java @@ -82,6 +82,7 @@ public class TestSCMContainerPlacementPolicyMetrics { "/rack1"); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(4) + .setPipelineNumber(10) .build(); cluster.waitForClusterToBeReady(); metrics = getMetrics(SCMContainerPlacementMetrics.class.getSimpleName()); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java index c9b8c89..618212a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java @@ -16,6 +16,7 @@ */ package org.apache.hadoop.ozone.scm.node; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -77,9 +78,11 @@ public class TestQueryNode { conf.setTimeDuration(HDDS_NODE_REPORT_INTERVAL, 1, SECONDS); conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS); conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS); + conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 3); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(numOfDatanodes) + .setPipelineNumber(numOfDatanodes + numOfDatanodes/2) .build(); cluster.waitForClusterToBeReady(); scmClient = new ContainerOperationClient(cluster diff --git a/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java index ab35191..bd7173c 100644 --- a/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java +++ b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java @@ -99,6 +99,7 @@ public class TestOzoneFsHAURLs { conf.setTimeDuration( OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY, LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS); + conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 3); OMStorage omStore = new OMStorage(conf); omStore.setClusterId(clusterId); @@ -108,6 +109,8 @@ public class TestOzoneFsHAURLs { // Start the cluster cluster = MiniOzoneCluster.newHABuilder(conf) + .setNumDatanodes(7) + .setPipelineNumber(10) .setClusterId(clusterId) .setScmId(scmId) .setOMServiceId(omServiceId) diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java index fdcb822..eb19fe7 100644 --- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java +++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java @@ -42,7 +42,7 @@ public abstract class TestDataValidate { static void startCluster(OzoneConfiguration conf) throws Exception { conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms"); cluster = MiniOzoneCluster.newBuilder(conf) - .setNumDatanodes(5).build(); + .setNumDatanodes(5).setPipelineNumber(8).build(); cluster.waitForClusterToBeReady(); } diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java index 13ecab6..cc922f2 100644 --- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java +++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java @@ -53,6 +53,7 @@ public class TestFreonWithPipelineDestroy { .setHbProcessorInterval(1000) .setHbInterval(1000) .setNumDatanodes(3) + .setPipelineNumber(8) .build(); cluster.waitForClusterToBeReady(); } --------------------------------------------------------------------- To unsubscribe, e-mail: hdfs-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: hdfs-commits-h...@hadoop.apache.org