Repository: hadoop Updated Branches: refs/heads/trunk 73e3a49eb -> 083b44c13
HDFS-9008. Balancer#Parameters class could use a builder pattern. (Chris Trezzo via mingma) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/083b44c1 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/083b44c1 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/083b44c1 Branch: refs/heads/trunk Commit: 083b44c136ea5aba660fcd1dddbb2d21513b4456 Parents: 73e3a49 Author: Ming Ma <[email protected]> Authored: Tue Sep 15 10:16:02 2015 -0700 Committer: Ming Ma <[email protected]> Committed: Tue Sep 15 10:16:02 2015 -0700 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hadoop/hdfs/server/balancer/Balancer.java | 134 ++++---------- .../server/balancer/BalancerParameters.java | 168 +++++++++++++++++ .../hdfs/server/balancer/TestBalancer.java | 180 ++++++++++--------- .../balancer/TestBalancerWithHANameNodes.java | 4 +- .../TestBalancerWithMultipleNameNodes.java | 26 ++- .../balancer/TestBalancerWithNodeGroup.java | 4 +- 7 files changed, 317 insertions(+), 202 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/083b44c1/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index c49432d..fef8ee5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -915,6 +915,9 @@ Release 2.8.0 - UNRELEASED HDFS-9065. Include commas on # of files, blocks, total filesystem objects in NN Web UI. (Daniel Templeton via wheat9) + HDFS-9008. Balancer#Parameters class could use a builder pattern. + (Chris Trezzo via mingma) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than http://git-wip-us.apache.org/repos/asf/hadoop/blob/083b44c1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index 259b280..f3f3d6f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -243,7 +243,8 @@ public class Balancer { * namenode as a client and a secondary namenode and retry proxies * when connection fails. */ - Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) { + Balancer(NameNodeConnector theblockpool, BalancerParameters p, + Configuration conf) { final long movedWinWidth = getLong(conf, DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT); @@ -265,13 +266,15 @@ public class Balancer { DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT); this.nnc = theblockpool; - this.dispatcher = new Dispatcher(theblockpool, p.includedNodes, - p.excludedNodes, movedWinWidth, moverThreads, dispatcherThreads, - maxConcurrentMovesPerNode, getBlocksSize, getBlocksMinBlockSize, conf); - this.threshold = p.threshold; - this.policy = p.policy; - this.sourceNodes = p.sourceNodes; - this.runDuringUpgrade = p.runDuringUpgrade; + this.dispatcher = + new Dispatcher(theblockpool, p.getIncludedNodes(), + p.getExcludedNodes(), movedWinWidth, moverThreads, + dispatcherThreads, maxConcurrentMovesPerNode, getBlocksSize, + getBlocksMinBlockSize, conf); + this.threshold = p.getThreshold(); + this.policy = p.getBalancingPolicy(); + this.sourceNodes = p.getSourceNodes(); + this.runDuringUpgrade = p.getRunDuringUpgrade(); this.maxSizeToMove = getLong(conf, DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY, @@ -629,7 +632,7 @@ public class Balancer { * for each namenode, * execute a {@link Balancer} to work through all datanodes once. */ - static int run(Collection<URI> namenodes, final Parameters p, + static int run(Collection<URI> namenodes, final BalancerParameters p, Configuration conf) throws IOException, InterruptedException { final long sleeptime = conf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, @@ -638,24 +641,25 @@ public class Balancer { DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000; LOG.info("namenodes = " + namenodes); LOG.info("parameters = " + p); - LOG.info("included nodes = " + p.includedNodes); - LOG.info("excluded nodes = " + p.excludedNodes); - LOG.info("source nodes = " + p.sourceNodes); - + LOG.info("included nodes = " + p.getIncludedNodes()); + LOG.info("excluded nodes = " + p.getExcludedNodes()); + LOG.info("source nodes = " + p.getSourceNodes()); + System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved"); List<NameNodeConnector> connectors = Collections.emptyList(); try { connectors = NameNodeConnector.newNameNodeConnectors(namenodes, - Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf, p.maxIdleIteration); - + Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf, + p.getMaxIdleIteration()); + boolean done = false; for(int iteration = 0; !done; iteration++) { done = true; Collections.shuffle(connectors); for(NameNodeConnector nnc : connectors) { - if (p.blockpools.size() == 0 - || p.blockpools.contains(nnc.getBlockpoolID())) { + if (p.getBlockPools().size() == 0 + || p.getBlockPools().contains(nnc.getBlockpoolID())) { final Balancer b = new Balancer(nnc, p, conf); final Result r = b.runOneIteration(); r.print(iteration, System.out); @@ -705,65 +709,6 @@ public class Balancer { return time+" "+unit; } - static class Parameters { - static final Parameters DEFAULT = - new Parameters(BalancingPolicy.Node.INSTANCE, 10.0, - NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, - Collections.<String> emptySet(), Collections.<String> emptySet(), - Collections.<String> emptySet(), Collections.<String> emptySet(), - false); - - final BalancingPolicy policy; - final double threshold; - final int maxIdleIteration; - /** Exclude the nodes in this set. */ - final Set<String> excludedNodes; - /** If empty, include any node; otherwise, include only these nodes. */ - final Set<String> includedNodes; - /** If empty, any node can be a source; - * otherwise, use only these nodes as source nodes. - */ - final Set<String> sourceNodes; - /** - * A set of block pools to run the balancer on. - */ - final Set<String> blockpools; - /** - * Whether to run the balancer during upgrade. - */ - final boolean runDuringUpgrade; - - Parameters(BalancingPolicy policy, double threshold, int maxIdleIteration, - Set<String> excludedNodes, Set<String> includedNodes, - Set<String> sourceNodes, Set<String> blockpools, - boolean runDuringUpgrade) { - this.policy = policy; - this.threshold = threshold; - this.maxIdleIteration = maxIdleIteration; - this.excludedNodes = excludedNodes; - this.includedNodes = includedNodes; - this.sourceNodes = sourceNodes; - this.blockpools = blockpools; - this.runDuringUpgrade = runDuringUpgrade; - } - - @Override - public String toString() { - return String.format("%s.%s [%s," - + " threshold = %s," - + " max idle iteration = %s," - + " #excluded nodes = %s," - + " #included nodes = %s," - + " #source nodes = %s," - + " #blockpools = %s," - + " run during upgrade = %s]", - Balancer.class.getSimpleName(), getClass().getSimpleName(), policy, - threshold, maxIdleIteration, excludedNodes.size(), - includedNodes.size(), sourceNodes.size(), blockpools.size(), - runDuringUpgrade); - } - } - static class Cli extends Configured implements Tool { /** * Parse arguments and then run Balancer. @@ -796,15 +741,10 @@ public class Balancer { } /** parse command line arguments */ - static Parameters parse(String[] args) { - BalancingPolicy policy = Parameters.DEFAULT.policy; - double threshold = Parameters.DEFAULT.threshold; - int maxIdleIteration = Parameters.DEFAULT.maxIdleIteration; - Set<String> excludedNodes = Parameters.DEFAULT.excludedNodes; - Set<String> includedNodes = Parameters.DEFAULT.includedNodes; - Set<String> sourceNodes = Parameters.DEFAULT.sourceNodes; - Set<String> blockpools = Parameters.DEFAULT.blockpools; - boolean runDuringUpgrade = Parameters.DEFAULT.runDuringUpgrade; + static BalancerParameters parse(String[] args) { + Set<String> excludedNodes = null; + Set<String> includedNodes = null; + BalancerParameters.Builder b = new BalancerParameters.Builder(); if (args != null) { try { @@ -813,12 +753,13 @@ public class Balancer { checkArgument(++i < args.length, "Threshold value is missing: args = " + Arrays.toString(args)); try { - threshold = Double.parseDouble(args[i]); + double threshold = Double.parseDouble(args[i]); if (threshold < 1 || threshold > 100) { throw new IllegalArgumentException( "Number out of range: threshold = " + threshold); } LOG.info( "Using a threshold of " + threshold ); + b.setThreshold(threshold); } catch(IllegalArgumentException e) { System.err.println( "Expecting a number in the range of [1.0, 100.0]: " @@ -829,7 +770,7 @@ public class Balancer { checkArgument(++i < args.length, "Policy value is missing: args = " + Arrays.toString(args)); try { - policy = BalancingPolicy.parse(args[i]); + b.setBalancingPolicy(BalancingPolicy.parse(args[i])); } catch(IllegalArgumentException e) { System.err.println("Illegal policy name: " + args[i]); throw e; @@ -837,28 +778,33 @@ public class Balancer { } else if ("-exclude".equalsIgnoreCase(args[i])) { excludedNodes = new HashSet<>(); i = processHostList(args, i, "exclude", excludedNodes); + b.setExcludedNodes(excludedNodes); } else if ("-include".equalsIgnoreCase(args[i])) { includedNodes = new HashSet<>(); i = processHostList(args, i, "include", includedNodes); + b.setIncludedNodes(includedNodes); } else if ("-source".equalsIgnoreCase(args[i])) { - sourceNodes = new HashSet<>(); + Set<String> sourceNodes = new HashSet<>(); i = processHostList(args, i, "source", sourceNodes); + b.setSourceNodes(sourceNodes); } else if ("-blockpools".equalsIgnoreCase(args[i])) { checkArgument( ++i < args.length, "blockpools value is missing: args = " + Arrays.toString(args)); - blockpools = parseBlockPoolList(args[i]); + Set<String> blockpools = parseBlockPoolList(args[i]); LOG.info("Balancer will run on the following blockpools: " + blockpools.toString()); + b.setBlockpools(blockpools); } else if ("-idleiterations".equalsIgnoreCase(args[i])) { checkArgument(++i < args.length, "idleiterations value is missing: args = " + Arrays .toString(args)); - maxIdleIteration = Integer.parseInt(args[i]); + int maxIdleIteration = Integer.parseInt(args[i]); LOG.info("Using a idleiterations of " + maxIdleIteration); + b.setMaxIdleIteration(maxIdleIteration); } else if ("-runDuringUpgrade".equalsIgnoreCase(args[i])) { - runDuringUpgrade = true; + b.setRunDuringUpgrade(true); LOG.info("Will run the balancer even during an ongoing HDFS " + "upgrade. Most users will not want to run the balancer " + "during an upgrade since it will not affect used space " @@ -868,16 +814,14 @@ public class Balancer { + Arrays.toString(args)); } } - checkArgument(excludedNodes.isEmpty() || includedNodes.isEmpty(), + checkArgument(excludedNodes == null || includedNodes == null, "-exclude and -include options cannot be specified together."); } catch(RuntimeException e) { printUsage(System.err); throw e; } } - - return new Parameters(policy, threshold, maxIdleIteration, excludedNodes, - includedNodes, sourceNodes, blockpools, runDuringUpgrade); + return b.build(); } private static int processHostList(String[] args, int i, String type, http://git-wip-us.apache.org/repos/asf/hadoop/blob/083b44c1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java new file mode 100644 index 0000000..5d5e9b1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.balancer; + +import java.util.Collections; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience; + [email protected] +final class BalancerParameters { + private final BalancingPolicy policy; + private final double threshold; + private final int maxIdleIteration; + /** Exclude the nodes in this set. */ + private final Set<String> excludedNodes; + /** If empty, include any node; otherwise, include only these nodes. */ + private final Set<String> includedNodes; + /** + * If empty, any node can be a source; otherwise, use only these nodes as + * source nodes. + */ + private final Set<String> sourceNodes; + /** + * A set of block pools to run the balancer on. + */ + private final Set<String> blockpools; + /** + * Whether to run the balancer during upgrade. + */ + private final boolean runDuringUpgrade; + + static final BalancerParameters DEFAULT = new BalancerParameters(); + + private BalancerParameters() { + this(new Builder()); + } + + private BalancerParameters(Builder builder) { + this.policy = builder.policy; + this.threshold = builder.threshold; + this.maxIdleIteration = builder.maxIdleIteration; + this.excludedNodes = builder.excludedNodes; + this.includedNodes = builder.includedNodes; + this.sourceNodes = builder.sourceNodes; + this.blockpools = builder.blockpools; + this.runDuringUpgrade = builder.runDuringUpgrade; + } + + BalancingPolicy getBalancingPolicy() { + return this.policy; + } + + double getThreshold() { + return this.threshold; + } + + int getMaxIdleIteration() { + return this.maxIdleIteration; + } + + Set<String> getExcludedNodes() { + return this.excludedNodes; + } + + Set<String> getIncludedNodes() { + return this.includedNodes; + } + + Set<String> getSourceNodes() { + return this.sourceNodes; + } + + Set<String> getBlockPools() { + return this.blockpools; + } + + boolean getRunDuringUpgrade() { + return this.runDuringUpgrade; + } + + @Override + public String toString() { + return String.format("%s.%s [%s," + " threshold = %s," + + " max idle iteration = %s," + " #excluded nodes = %s," + + " #included nodes = %s," + " #source nodes = %s," + + " #blockpools = %s," + " run during upgrade = %s]", + Balancer.class.getSimpleName(), getClass().getSimpleName(), policy, + threshold, maxIdleIteration, excludedNodes.size(), + includedNodes.size(), sourceNodes.size(), blockpools.size(), + runDuringUpgrade); + } + + static class Builder { + // Defaults + private BalancingPolicy policy = BalancingPolicy.Node.INSTANCE; + private double threshold = 10.0; + private int maxIdleIteration = + NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS; + private Set<String> excludedNodes = Collections.<String> emptySet(); + private Set<String> includedNodes = Collections.<String> emptySet(); + private Set<String> sourceNodes = Collections.<String> emptySet(); + private Set<String> blockpools = Collections.<String> emptySet(); + private boolean runDuringUpgrade = false; + + Builder() { + } + + Builder setBalancingPolicy(BalancingPolicy p) { + this.policy = p; + return this; + } + + Builder setThreshold(double t) { + this.threshold = t; + return this; + } + + Builder setMaxIdleIteration(int m) { + this.maxIdleIteration = m; + return this; + } + + Builder setExcludedNodes(Set<String> nodes) { + this.excludedNodes = nodes; + return this; + } + + Builder setIncludedNodes(Set<String> nodes) { + this.includedNodes = nodes; + return this; + } + + Builder setSourceNodes(Set<String> nodes) { + this.sourceNodes = nodes; + return this; + } + + Builder setBlockpools(Set<String> pools) { + this.blockpools = pools; + return this; + } + + Builder setRunDuringUpgrade(boolean run) { + this.runDuringUpgrade = run; + return this; + } + + BalancerParameters build() { + return new BalancerParameters(this); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/083b44c1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index b0223d2..a655d66 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -75,8 +75,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli; -import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters; import org.apache.hadoop.hdfs.server.balancer.Balancer.Result; +import org.apache.hadoop.hdfs.server.balancer.BalancerParameters; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase; @@ -319,7 +319,7 @@ public class TestBalancer { * @throws TimeoutException */ static void waitForBalancer(long totalUsedSpace, long totalCapacity, - ClientProtocol client, MiniDFSCluster cluster, Balancer.Parameters p) + ClientProtocol client, MiniDFSCluster cluster, BalancerParameters p) throws IOException, TimeoutException { waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, 0); } @@ -377,7 +377,7 @@ public class TestBalancer { // start rebalancing Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); - int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); + int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf); assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); } finally { @@ -393,16 +393,16 @@ public class TestBalancer { * @throws TimeoutException */ static void waitForBalancer(long totalUsedSpace, long totalCapacity, - ClientProtocol client, MiniDFSCluster cluster, Balancer.Parameters p, + ClientProtocol client, MiniDFSCluster cluster, BalancerParameters p, int expectedExcludedNodes) throws IOException, TimeoutException { long timeout = TIMEOUT; long failtime = (timeout <= 0L) ? Long.MAX_VALUE : Time.monotonicNow() + timeout; - if (!p.includedNodes.isEmpty()) { - totalCapacity = p.includedNodes.size() * CAPACITY; + if (!p.getIncludedNodes().isEmpty()) { + totalCapacity = p.getIncludedNodes().size() * CAPACITY; } - if (!p.excludedNodes.isEmpty()) { - totalCapacity -= p.excludedNodes.size() * CAPACITY; + if (!p.getExcludedNodes().isEmpty()) { + totalCapacity -= p.getExcludedNodes().size() * CAPACITY; } final double avgUtilization = ((double)totalUsedSpace) / totalCapacity; boolean balanced; @@ -415,12 +415,12 @@ public class TestBalancer { for (DatanodeInfo datanode : datanodeReport) { double nodeUtilization = ((double)datanode.getDfsUsed()) / datanode.getCapacity(); - if (Dispatcher.Util.isExcluded(p.excludedNodes, datanode)) { + if (Dispatcher.Util.isExcluded(p.getExcludedNodes(), datanode)) { assertTrue(nodeUtilization == 0); actualExcludedNodeCount++; continue; } - if (!Dispatcher.Util.isIncluded(p.includedNodes, datanode)) { + if (!Dispatcher.Util.isIncluded(p.getIncludedNodes(), datanode)) { assertTrue(nodeUtilization == 0); actualExcludedNodeCount++; continue; @@ -636,16 +636,14 @@ public class TestBalancer { } } // run balancer and validate results - Balancer.Parameters p = Balancer.Parameters.DEFAULT; + BalancerParameters.Builder pBuilder = + new BalancerParameters.Builder(); if (nodes != null) { - p = new Balancer.Parameters( - Balancer.Parameters.DEFAULT.policy, - Balancer.Parameters.DEFAULT.threshold, - Balancer.Parameters.DEFAULT.maxIdleIteration, - nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded(), - Balancer.Parameters.DEFAULT.sourceNodes, - Balancer.Parameters.DEFAULT.blockpools, false); + pBuilder.setExcludedNodes(nodes.getNodesToBeExcluded()); + pBuilder.setIncludedNodes(nodes.getNodesToBeIncluded()); + pBuilder.setRunDuringUpgrade(false); } + BalancerParameters p = pBuilder.build(); int expectedExcludedNodes = 0; if (nodes != null) { @@ -668,14 +666,15 @@ public class TestBalancer { } } - private void runBalancer(Configuration conf, - long totalUsedSpace, long totalCapacity) throws Exception { - runBalancer(conf, totalUsedSpace, totalCapacity, Balancer.Parameters.DEFAULT, 0); + private void runBalancer(Configuration conf, long totalUsedSpace, + long totalCapacity) throws Exception { + runBalancer(conf, totalUsedSpace, totalCapacity, + BalancerParameters.DEFAULT, 0); } - private void runBalancer(Configuration conf, - long totalUsedSpace, long totalCapacity, Balancer.Parameters p, - int excludedNodes) throws Exception { + private void runBalancer(Configuration conf, long totalUsedSpace, + long totalCapacity, BalancerParameters p, int excludedNodes) + throws Exception { waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); // start rebalancing @@ -693,7 +692,8 @@ public class TestBalancer { waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, excludedNodes); } - private static int runBalancer(Collection<URI> namenodes, final Parameters p, + private static int runBalancer(Collection<URI> namenodes, + final BalancerParameters p, Configuration conf) throws IOException, InterruptedException { final long sleeptime = conf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, @@ -710,8 +710,8 @@ public class TestBalancer { try { connectors = NameNodeConnector.newNameNodeConnectors(namenodes, Balancer.class.getSimpleName(), Balancer.BALANCER_ID_PATH, conf, - Balancer.Parameters.DEFAULT.maxIdleIteration); - + BalancerParameters.DEFAULT.getMaxIdleIteration()); + boolean done = false; for(int iteration = 0; !done; iteration++) { done = true; @@ -747,45 +747,45 @@ public class TestBalancer { return ExitStatus.SUCCESS.getExitCode(); } - private void runBalancerCli(Configuration conf, - long totalUsedSpace, long totalCapacity, - Balancer.Parameters p, boolean useFile, int expectedExcludedNodes) throws Exception { + private void runBalancerCli(Configuration conf, long totalUsedSpace, + long totalCapacity, BalancerParameters p, boolean useFile, + int expectedExcludedNodes) throws Exception { waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); List <String> args = new ArrayList<String>(); args.add("-policy"); args.add("datanode"); File excludeHostsFile = null; - if (!p.excludedNodes.isEmpty()) { + if (!p.getExcludedNodes().isEmpty()) { args.add("-exclude"); if (useFile) { excludeHostsFile = new File ("exclude-hosts-file"); PrintWriter pw = new PrintWriter(excludeHostsFile); - for (String host: p.excludedNodes) { + for (String host : p.getExcludedNodes()) { pw.write( host + "\n"); } pw.close(); args.add("-f"); args.add("exclude-hosts-file"); } else { - args.add(StringUtils.join(p.excludedNodes, ',')); + args.add(StringUtils.join(p.getExcludedNodes(), ',')); } } File includeHostsFile = null; - if (!p.includedNodes.isEmpty()) { + if (!p.getIncludedNodes().isEmpty()) { args.add("-include"); if (useFile) { includeHostsFile = new File ("include-hosts-file"); PrintWriter pw = new PrintWriter(includeHostsFile); - for (String host: p.includedNodes){ + for (String host : p.getIncludedNodes()) { pw.write( host + "\n"); } pw.close(); args.add("-f"); args.add("include-hosts-file"); } else { - args.add(StringUtils.join(p.includedNodes, ',')); + args.add(StringUtils.join(p.getIncludedNodes(), ',')); } } @@ -879,14 +879,11 @@ public class TestBalancer { Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); Set<String> datanodes = new HashSet<String>(); datanodes.add(cluster.getDataNodes().get(0).getDatanodeId().getHostName()); - Balancer.Parameters p = new Balancer.Parameters( - Balancer.Parameters.DEFAULT.policy, - Balancer.Parameters.DEFAULT.threshold, - Balancer.Parameters.DEFAULT.maxIdleIteration, - datanodes, Balancer.Parameters.DEFAULT.includedNodes, - Balancer.Parameters.DEFAULT.sourceNodes, - Balancer.Parameters.DEFAULT.blockpools, false); - final int r = Balancer.run(namenodes, p, conf); + BalancerParameters.Builder pBuilder = + new BalancerParameters.Builder(); + pBuilder.setExcludedNodes(datanodes); + pBuilder.setRunDuringUpgrade(false); + final int r = Balancer.run(namenodes, pBuilder.build(), conf); assertEquals(ExitStatus.SUCCESS.getExitCode(), r); } finally { cluster.shutdown(); @@ -1081,20 +1078,20 @@ public class TestBalancer { @Test public void testBalancerCliParseBlockpools() { String[] parameters = new String[] { "-blockpools", "bp-1,bp-2,bp-3" }; - Balancer.Parameters p = Balancer.Cli.parse(parameters); - assertEquals(3, p.blockpools.size()); + BalancerParameters p = Balancer.Cli.parse(parameters); + assertEquals(3, p.getBlockPools().size()); parameters = new String[] { "-blockpools", "bp-1" }; p = Balancer.Cli.parse(parameters); - assertEquals(1, p.blockpools.size()); + assertEquals(1, p.getBlockPools().size()); parameters = new String[] { "-blockpools", "bp-1,,bp-2" }; p = Balancer.Cli.parse(parameters); - assertEquals(3, p.blockpools.size()); + assertEquals(3, p.getBlockPools().size()); parameters = new String[] { "-blockpools", "bp-1," }; p = Balancer.Cli.parse(parameters); - assertEquals(1, p.blockpools.size()); + assertEquals(1, p.getBlockPools().size()); } /** @@ -1123,7 +1120,8 @@ public class TestBalancer { excludeHosts.add( "datanodeZ"); doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, - excludeHosts, Parameters.DEFAULT.includedNodes), false, false); + excludeHosts, BalancerParameters.DEFAULT.getIncludedNodes()), + false, false); } /** @@ -1151,9 +1149,11 @@ public class TestBalancer { Set<String> excludeHosts = new HashSet<String>(); excludeHosts.add( "datanodeY"); excludeHosts.add( "datanodeZ"); - doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, - new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, excludeHosts, - Parameters.DEFAULT.includedNodes), true, false); + doTest(conf, new long[] { CAPACITY, CAPACITY }, + new String[] { RACK0, RACK1 }, CAPACITY, RACK2, new HostNameBasedNodes( + new String[] { "datanodeX", "datanodeY", "datanodeZ" }, + excludeHosts, BalancerParameters.DEFAULT.getIncludedNodes()), true, + false); } /** @@ -1183,7 +1183,8 @@ public class TestBalancer { excludeHosts.add( "datanodeZ"); doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, - excludeHosts, Parameters.DEFAULT.includedNodes), true, true); + excludeHosts, BalancerParameters.DEFAULT.getIncludedNodes()), true, + true); } /** @@ -1212,7 +1213,8 @@ public class TestBalancer { includeHosts.add( "datanodeY"); doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, - Parameters.DEFAULT.excludedNodes, includeHosts), false, false); + BalancerParameters.DEFAULT.getExcludedNodes(), includeHosts), + false, false); } /** @@ -1241,7 +1243,8 @@ public class TestBalancer { includeHosts.add( "datanodeY"); doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, - Parameters.DEFAULT.excludedNodes, includeHosts), true, false); + BalancerParameters.DEFAULT.getExcludedNodes(), includeHosts), true, + false); } /** @@ -1270,7 +1273,8 @@ public class TestBalancer { includeHosts.add( "datanodeY"); doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, - Parameters.DEFAULT.excludedNodes, includeHosts), true, true); + BalancerParameters.DEFAULT.getExcludedNodes(), includeHosts), true, + true); } /** @@ -1343,7 +1347,7 @@ public class TestBalancer { Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); // Run Balancer - final Balancer.Parameters p = Parameters.DEFAULT; + final BalancerParameters p = BalancerParameters.DEFAULT; final int r = Balancer.run(namenodes, p, conf); // Validate no RAM_DISK block should be moved @@ -1395,7 +1399,7 @@ public class TestBalancer { Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); // Run balancer - final Balancer.Parameters p = Parameters.DEFAULT; + final BalancerParameters p = BalancerParameters.DEFAULT; fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER); fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.PREPARE); @@ -1406,14 +1410,10 @@ public class TestBalancer { Balancer.run(namenodes, p, conf)); // Should work with the -runDuringUpgrade flag. - final Balancer.Parameters runDuringUpgrade = - new Balancer.Parameters(Parameters.DEFAULT.policy, - Parameters.DEFAULT.threshold, - Parameters.DEFAULT.maxIdleIteration, - Parameters.DEFAULT.excludedNodes, - Parameters.DEFAULT.includedNodes, - Parameters.DEFAULT.sourceNodes, - Balancer.Parameters.DEFAULT.blockpools, true); + BalancerParameters.Builder b = + new BalancerParameters.Builder(); + b.setRunDuringUpgrade(true); + final BalancerParameters runDuringUpgrade = b.build(); assertEquals(ExitStatus.SUCCESS.getExitCode(), Balancer.run(namenodes, runDuringUpgrade, conf)); @@ -1480,7 +1480,7 @@ public class TestBalancer { // update space info cluster.triggerHeartbeats(); - Balancer.Parameters p = Balancer.Parameters.DEFAULT; + BalancerParameters p = BalancerParameters.DEFAULT; Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); final int r = Balancer.run(namenodes, p, conf); @@ -1612,12 +1612,11 @@ public class TestBalancer { final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); { // run Balancer with min-block-size=50 - final Parameters p = new Parameters( - BalancingPolicy.Node.INSTANCE, 1, - NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, - Collections.<String> emptySet(), Collections.<String> emptySet(), - Collections.<String> emptySet(), - Balancer.Parameters.DEFAULT.blockpools, false); + BalancerParameters.Builder b = + new BalancerParameters.Builder(); + b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE); + b.setThreshold(1); + final BalancerParameters p = b.build(); conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50); final int r = Balancer.run(namenodes, p, conf); @@ -1632,11 +1631,12 @@ public class TestBalancer { for(int i = capacities.length; i < datanodes.size(); i++) { sourceNodes.add(datanodes.get(i).getDisplayName()); } - final Parameters p = new Parameters( - BalancingPolicy.Node.INSTANCE, 1, - NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, - Collections.<String> emptySet(), Collections.<String> emptySet(), - sourceNodes, Balancer.Parameters.DEFAULT.blockpools, false); + BalancerParameters.Builder b = + new BalancerParameters.Builder(); + b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE); + b.setThreshold(1); + b.setSourceNodes(sourceNodes); + final BalancerParameters p = b.build(); conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50); final int r = Balancer.run(namenodes, p, conf); @@ -1647,11 +1647,12 @@ public class TestBalancer { final Set<String> sourceNodes = new HashSet<>(); final List<DataNode> datanodes = cluster.getDataNodes(); sourceNodes.add(datanodes.get(0).getDisplayName()); - final Parameters p = new Parameters( - BalancingPolicy.Node.INSTANCE, 1, - NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, - Collections.<String> emptySet(), Collections.<String> emptySet(), - sourceNodes, Balancer.Parameters.DEFAULT.blockpools, false); + BalancerParameters.Builder b = + new BalancerParameters.Builder(); + b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE); + b.setThreshold(1); + b.setSourceNodes(sourceNodes); + final BalancerParameters p = b.build(); conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1); final int r = Balancer.run(namenodes, p, conf); @@ -1664,11 +1665,12 @@ public class TestBalancer { for(int i = 0; i < capacities.length; i++) { sourceNodes.add(datanodes.get(i).getDisplayName()); } - final Parameters p = new Parameters( - BalancingPolicy.Node.INSTANCE, 1, - NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, - Collections.<String> emptySet(), Collections.<String> emptySet(), - sourceNodes, Balancer.Parameters.DEFAULT.blockpools, false); + BalancerParameters.Builder b = + new BalancerParameters.Builder(); + b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE); + b.setThreshold(1); + b.setSourceNodes(sourceNodes); + final BalancerParameters p = b.build(); conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1); final int r = Balancer.run(namenodes, p, conf); http://git-wip-us.apache.org/repos/asf/hadoop/blob/083b44c1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java index 7559de4..1693cf1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java @@ -97,10 +97,10 @@ public class TestBalancerWithHANameNodes { Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); assertEquals(1, namenodes.size()); assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster))); - final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); + final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf); assertEquals(ExitStatus.SUCCESS.getExitCode(), r); TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client, - cluster, Balancer.Parameters.DEFAULT); + cluster, BalancerParameters.DEFAULT); } finally { cluster.shutdown(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/083b44c1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java index b07ad89..c5d16ab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.balancer.BalancerParameters; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.log4j.Level; @@ -84,10 +85,10 @@ public class TestBalancerWithMultipleNameNodes { final MiniDFSCluster cluster; final ClientProtocol[] clients; final short replication; - final Balancer.Parameters parameters; + final BalancerParameters parameters; Suite(MiniDFSCluster cluster, final int nNameNodes, final int nDataNodes, - Balancer.Parameters parameters, Configuration conf) throws IOException { + BalancerParameters parameters, Configuration conf) throws IOException { this.conf = conf; this.cluster = cluster; clients = new ClientProtocol[nNameNodes]; @@ -204,7 +205,7 @@ public class TestBalancerWithMultipleNameNodes { balanced = true; for(int d = 0; d < used.length; d++) { final double p = used[d]*100.0/cap[d]; - balanced = p <= avg + s.parameters.threshold; + balanced = p <= avg + s.parameters.getThreshold(); if (!balanced) { if (i % 100 == 0) { LOG.warn("datanodes " + d + " is not yet balanced: " @@ -278,13 +279,14 @@ public class TestBalancerWithMultipleNameNodes { DatanodeStorageReport[]> getStorageReports(Suite s) throws IOException { Map<Integer, DatanodeStorageReport[]> reports = new HashMap<Integer, DatanodeStorageReport[]>(); - if (s.parameters.blockpools.size() == 0) { + if (s.parameters.getBlockPools().size() == 0) { // the blockpools parameter was not set, so we don't need to track any // blockpools. return Collections.emptyMap(); } for (int i = 0; i < s.clients.length; i++) { - if (s.parameters.blockpools.contains(s.cluster.getNamesystem(i) + if (s.parameters.getBlockPools().contains( + s.cluster.getNamesystem(i) .getBlockPoolId())) { // we want to ensure that blockpools not specified by the balancer // parameters were left alone. Therefore, if the pool was specified, @@ -388,14 +390,10 @@ public class TestBalancerWithMultipleNameNodes { for (int i = 0; i < nNameNodesToBalance; i++) { blockpools.add(cluster.getNamesystem(i).getBlockPoolId()); } - Balancer.Parameters params = - new Balancer.Parameters(Balancer.Parameters.DEFAULT.policy, - Balancer.Parameters.DEFAULT.threshold, - Balancer.Parameters.DEFAULT.maxIdleIteration, - Balancer.Parameters.DEFAULT.excludedNodes, - Balancer.Parameters.DEFAULT.includedNodes, - Balancer.Parameters.DEFAULT.sourceNodes, blockpools, - Balancer.Parameters.DEFAULT.runDuringUpgrade); + BalancerParameters.Builder b = + new BalancerParameters.Builder(); + b.setBlockpools(blockpools); + BalancerParameters params = b.build(); final Suite s = new Suite(cluster, nNameNodes, nDataNodes, params, conf); for(int n = 0; n < nNameNodes; n++) { @@ -455,7 +453,7 @@ public class TestBalancerWithMultipleNameNodes { LOG.info("RUN_TEST 1"); final Suite s = new Suite(cluster, nNameNodes, nDataNodes, - Balancer.Parameters.DEFAULT, conf); + BalancerParameters.DEFAULT, conf); long totalCapacity = TestBalancer.sum(capacities); LOG.info("RUN_TEST 2"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/083b44c1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java index 7af3a0e..bfa2835 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java @@ -175,7 +175,7 @@ public class TestBalancerWithNodeGroup { // start rebalancing Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); - final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); + final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf); assertEquals(ExitStatus.SUCCESS.getExitCode(), r); waitForHeartBeat(totalUsedSpace, totalCapacity); @@ -189,7 +189,7 @@ public class TestBalancerWithNodeGroup { // start rebalancing Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); - final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); + final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf); Assert.assertTrue(r == ExitStatus.SUCCESS.getExitCode() || (r == ExitStatus.NO_MOVE_PROGRESS.getExitCode())); waitForHeartBeat(totalUsedSpace, totalCapacity);
