HDFS-9545: DiskBalancer: Add Plan Command. Contributed by Anu Engineer.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/75882ec0 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/75882ec0 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/75882ec0 Branch: refs/heads/HDFS-1312 Commit: 75882ec0b096da862b8b373b70a091c19f281b2a Parents: 1594b47 Author: Anu Engineer <aengin...@apache.org> Authored: Mon May 9 10:17:56 2016 -0700 Committer: Arpit Agarwal <a...@apache.org> Committed: Thu Jun 23 18:21:08 2016 -0700 ---------------------------------------------------------------------- .../hadoop-hdfs/src/main/bin/hdfs | 6 + .../server/diskbalancer/command/Command.java | 381 +++++++++++++++++++ .../diskbalancer/command/PlanCommand.java | 217 +++++++++++ .../diskbalancer/command/package-info.java | 22 ++ .../datamodel/DiskBalancerCluster.java | 89 +++-- .../datamodel/DiskBalancerDataNode.java | 2 +- .../datamodel/DiskBalancerVolumeSet.java | 2 +- .../diskbalancer/planner/GreedyPlanner.java | 4 +- .../diskbalancer/planner/PlannerFactory.java | 6 +- .../apache/hadoop/hdfs/tools/DiskBalancer.java | 260 +++++++++++++ .../TestDiskBalancerWithMockMover.java | 2 - 11 files changed, 945 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/75882ec0/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs index 2a29d17..7952560 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs @@ -39,6 +39,7 @@ function hadoop_usage hadoop_add_subcommand "debug" "run a Debug Admin to execute HDFS debug commands" hadoop_add_subcommand "dfs" "run a filesystem command on the file system" hadoop_add_subcommand "dfsadmin" "run a DFS admin client" + hadoop_add_subcommand "diskbalancer" "Distributes data evenly among disks on a given node" hadoop_add_subcommand "envvars" "display computed Hadoop environment variables" hadoop_add_subcommand "erasurecode" "run a HDFS ErasureCoding CLI" hadoop_add_subcommand "fetchdt" "fetch a delegation token from the NameNode" @@ -125,6 +126,11 @@ function hdfscmd_case hadoop_debug "Appending HADOOP_CLIENT_OPTS onto HADOOP_OPTS" HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}" ;; + diskbalancer) + HADOOP_CLASSNAME=org.apache.hadoop.hdfs.tools.DiskBalancer + hadoop_debug "Appending HADOOP_CLIENT_OPTS onto HADOOP_OPTS" + HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}" + ;; envvars) echo "JAVA_HOME='${JAVA_HOME}'" echo "HADOOP_HDFS_HOME='${HADOOP_HDFS_HOME}'" http://git-wip-us.apache.org/repos/asf/hadoop/blob/75882ec0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java new file mode 100644 index 0000000..6522434 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.diskbalancer.command; + +import com.google.common.base.Preconditions; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; +import org.apache.hadoop.hdfs.tools.DiskBalancer; +import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector; +import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory; +import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster; +import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode; + +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.text.SimpleDateFormat; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.List; + +/** + * Common interface for command handling. + */ +public abstract class Command extends Configured { + static final Logger LOG = LoggerFactory.getLogger(Command.class); + private Map<String, String> validArgs = new HashMap<>(); + private URI clusterURI; + private FileSystem fs = null; + private DiskBalancerCluster cluster = null; + + private static final Path DEFAULT_LOG_DIR = new Path("/system/diskbalancer"); + + private Path diskBalancerLogs; + + /** + * Constructs a command. + */ + public Command(Configuration conf) { + super(conf); + // These arguments are valid for all commands. + addValidCommandParameters(DiskBalancer.NAMENODEURI, "Name Node URI or " + + "file URI for cluster"); + addValidCommandParameters(DiskBalancer.HELP, "Help for this command"); + addValidCommandParameters("arg", ""); + } + + /** + * Executes the Client Calls. + * + * @param cmd - CommandLine + * @throws IOException + * @throws URISyntaxException + */ + public abstract void execute(CommandLine cmd) throws Exception; + + /** + * Gets extended help for this command. + * + * @return Help Message + */ + protected abstract String getHelp(); + + /** + * verifies user provided URL. + * + * @param uri - UrlString + * @return URL + * @throws URISyntaxException, MalformedURLException + */ + protected URI verifyURI(String uri) + throws URISyntaxException, MalformedURLException { + if ((uri == null) || uri.isEmpty()) { + throw new MalformedURLException( + "A valid URI is needed to execute this command."); + } + return new URI(uri); + } + + /** + * Process the URI and return the cluster with nodes setup. This is used in + * all commands. + * + * @param cmd - CommandLine + * @return DiskBalancerCluster + * @throws Exception + */ + protected DiskBalancerCluster readClusterInfo(CommandLine cmd) throws + Exception { + Preconditions.checkNotNull(cmd); + Preconditions + .checkState(cmd.getOptionValue(DiskBalancer.NAMENODEURI) != null, + "Required argument missing : uri"); + + setClusterURI(verifyURI(cmd.getOptionValue(DiskBalancer.NAMENODEURI))); + LOG.debug("using name node URI : {}", this.getClusterURI()); + ClusterConnector connector = ConnectorFactory.getCluster(this.clusterURI, + getConf()); + + cluster = new DiskBalancerCluster(connector); + + LOG.debug("Reading cluster info"); + cluster.readClusterInfo(); + return cluster; + } + + /** + * Setup the outpath. + * + * @param path - Path or null to use default path. + * @throws IOException + */ + protected void setOutputPath(String path) throws IOException { + + SimpleDateFormat format = new SimpleDateFormat("yyyy-MMM-dd-HH-mm-ss"); + Date now = new Date(); + + fs = FileSystem.get(getClusterURI(), getConf()); + if (path == null || path.isEmpty()) { + if (getClusterURI().getScheme().startsWith("file")) { + diskBalancerLogs = new Path( + System.getProperty("user.dir") + DEFAULT_LOG_DIR.toString() + + format.format(now)); + } else { + diskBalancerLogs = new Path(DEFAULT_LOG_DIR.toString() + + format.format(now)); + } + } else { + diskBalancerLogs = new Path(path); + } + if (fs.exists(diskBalancerLogs)) { + LOG.error("Another Diskbalancer instance is running ? - Target " + + "Directory already exists. {}", diskBalancerLogs); + throw new IOException("Another DiskBalancer files already exist at the " + + "target location. " + diskBalancerLogs.toString()); + } + fs.mkdirs(diskBalancerLogs); + } + + /** + * Sets the nodes to process. + * + * @param node - Node + */ + protected void setNodesToProcess(DiskBalancerDataNode node) { + List<DiskBalancerDataNode> nodelist = new LinkedList<>(); + nodelist.add(node); + setNodesToProcess(nodelist); + } + + /** + * Sets the list of Nodes to process. + * + * @param nodes Nodes. + */ + protected void setNodesToProcess(List<DiskBalancerDataNode> nodes) { + if (cluster == null) { + throw new IllegalStateException("Set nodes to process invoked before " + + "initializing cluster. Illegal usage."); + } + cluster.setNodesToProcess(nodes); + } + + /** + * Returns a DiskBalancer Node from the Cluster or null if not found. + * + * @param nodeName - can the hostname, IP address or UUID of the node. + * @return - DataNode if found. + */ + DiskBalancerDataNode getNode(String nodeName) { + DiskBalancerDataNode node = null; + if (nodeName == null || nodeName.isEmpty()) { + return node; + } + if (cluster.getNodes().size() == 0) { + return node; + } + + node = cluster.getNodeByName(nodeName); + if (node != null) { + return node; + } + + node = cluster.getNodeByIPAddress(nodeName); + if (node != null) { + return node; + } + node = cluster.getNodeByUUID(nodeName); + return node; + } + + /** + * Gets the node set from a file or a string. + * + * @param listArg - String File URL or a comma separated list of node names. + * @return Set of node names + * @throws IOException + */ + private Set<String> getNodeList(String listArg) throws IOException { + URL listURL; + String nodeData; + Set<String> resultSet = new TreeSet<>(); + + if ((listArg == null) || listArg.isEmpty()) { + return resultSet; + } + if (listArg.startsWith("file://")) { + listURL = new URL(listArg); + byte[] data = Files.readAllBytes(Paths.get(listURL.getPath())); + nodeData = new String(data, Charset.forName("UTF-8")); + } else { + nodeData = listArg; + } + + String[] nodes = nodeData.split(","); + Collections.addAll(resultSet, nodes); + return resultSet; + } + + /** + * Verifies if the command line options are sane. + * + * @param commandName - Name of the command + * @param cmd - Parsed Command Line + */ + protected void verifyCommandOptions(String commandName, CommandLine cmd) { + @SuppressWarnings("unchecked") + Iterator<Option> iter = cmd.iterator(); + while (iter.hasNext()) { + Option opt = iter.next(); + if (!validArgs.containsKey(opt.getArgName())) { + String errMessage = String + .format("%nInvalid argument found for command %s : %s%n", + commandName, opt.getArgName()); + StringBuilder validArguments = new StringBuilder(); + validArguments.append("Valid arguments are : %n"); + for (Map.Entry<String, String> args : validArgs.entrySet()) { + String key = args.getKey(); + String desc = args.getValue(); + String s = String.format("\t %s : %s %n", key, desc); + validArguments.append(s); + } + LOG.error(errMessage + validArguments.toString()); + throw new IllegalArgumentException("Invalid Arguments found."); + } + } + } + + /** + * Gets cluster URL. + * + * @return - URL + */ + public URI getClusterURI() { + return clusterURI; + } + + /** + * Set cluster URL. + * + * @param clusterURI - URL + */ + public void setClusterURI(URI clusterURI) { + this.clusterURI = clusterURI; + } + + /** + * Copied from DFSAdmin.java. -- Creates a connection to dataNode. + * + * @param datanode - dataNode. + * @return ClientDataNodeProtocol + * @throws IOException + */ + public ClientDatanodeProtocol getDataNodeProxy(String datanode) + throws IOException { + InetSocketAddress datanodeAddr = NetUtils.createSocketAddr(datanode); + + // For datanode proxy the server principal should be DN's one. + getConf().set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY, + getConf().get(DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, "")); + + // Create the client + ClientDatanodeProtocol dnProtocol = + DFSUtilClient.createClientDatanodeProtocolProxy(datanodeAddr, getUGI(), + getConf(), NetUtils.getSocketFactory(getConf(), + ClientDatanodeProtocol + .class)); + return dnProtocol; + } + + /** + * Returns UGI. + * + * @return UserGroupInformation. + * @throws IOException + */ + private static UserGroupInformation getUGI() + throws IOException { + return UserGroupInformation.getCurrentUser(); + } + + /** + * Returns a file created in the cluster. + * + * @param fileName - fileName to open. + * @return OutputStream. + */ + protected FSDataOutputStream create(String fileName) throws IOException { + return fs.create(new Path(this.diskBalancerLogs, fileName)); + } + + /** + * Returns the output path where the plan and snapshot gets written. + * + * @return Path + */ + protected Path getOutputPath() { + return diskBalancerLogs; + } + + /** + * Adds valid params to the valid args table. + * + * @param key + * @param desc + */ + protected void addValidCommandParameters(String key, String desc) { + validArgs.put(key, desc); + } + + /** + * Returns the cluster. + * + * @return Cluster. + */ + protected DiskBalancerCluster getCluster() { + return cluster; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/75882ec0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/PlanCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/PlanCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/PlanCommand.java new file mode 100644 index 0000000..2422215 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/PlanCommand.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.diskbalancer.command; + +import com.google.common.base.Preconditions; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.tools.DiskBalancer; +import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode; +import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan; +import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step; +import org.codehaus.jackson.map.ObjectMapper; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; + +/** + * Class that implements Plan Command. + * <p> + * Plan command reads the Cluster Info and creates a plan for specified data + * node or a set of Data nodes. + * <p> + * It writes the output to a default location unless changed by the user. + */ +public class PlanCommand extends Command { + private double thresholdPercentage; + private int bandwidth; + private int maxError; + + /** + * Constructs a plan command. + */ + public PlanCommand(Configuration conf) { + super(conf); + this.thresholdPercentage = 1; + this.bandwidth = 0; + this.maxError = 0; + addValidCommandParameters(DiskBalancer.OUTFILE, "Output file"); + addValidCommandParameters(DiskBalancer.BANDWIDTH, "Maximum Bandwidth to " + + "be used while copying."); + addValidCommandParameters(DiskBalancer.THRESHOLD, "Percentage skew that " + + "we tolerate before diskbalancer starts working."); + addValidCommandParameters(DiskBalancer.MAXERROR, "Max errors to tolerate " + + "between 2 disks"); + addValidCommandParameters(DiskBalancer.NODE, "Name / Address of the node."); + addValidCommandParameters(DiskBalancer.VERBOSE, "Run plan command in " + + "verbose mode."); + } + + /** + * Runs the plan command. This command can be run with various options like + * <p> + * -plan -node IP -plan -node hostName -plan -node DatanodeUUID + * + * @param cmd - CommandLine + */ + @Override + public void execute(CommandLine cmd) throws Exception { + LOG.debug("Processing Plan Command."); + Preconditions.checkState(cmd.hasOption(DiskBalancer.PLAN)); + verifyCommandOptions(DiskBalancer.PLAN, cmd); + + if (!cmd.hasOption(DiskBalancer.NODE)) { + throw new IllegalArgumentException("A node name is required to create a" + + " plan."); + } + + if (cmd.hasOption(DiskBalancer.BANDWIDTH)) { + this.bandwidth = Integer.parseInt(cmd.getOptionValue(DiskBalancer + .BANDWIDTH)); + } + + if (cmd.hasOption(DiskBalancer.MAXERROR)) { + this.maxError = Integer.parseInt(cmd.getOptionValue(DiskBalancer + .MAXERROR)); + } + + readClusterInfo(cmd); + String output = null; + if (cmd.hasOption(DiskBalancer.OUTFILE)) { + output = cmd.getOptionValue(DiskBalancer.OUTFILE); + } + setOutputPath(output); + + DiskBalancerDataNode node = getNode(cmd.getOptionValue(DiskBalancer.NODE)); + if (node == null) { + throw new IllegalArgumentException("Unable to find the specified node. " + + cmd.getOptionValue(DiskBalancer.NODE)); + } + this.thresholdPercentage = getThresholdPercentage(cmd); + setNodesToProcess(node); + + List<NodePlan> plans = getCluster().computePlan(this.thresholdPercentage); + setPlanParams(plans); + + LOG.info("Writing plan to : {}", getOutputPath()); + System.out.printf("Writing plan to : %s%n", getOutputPath()); + + try(FSDataOutputStream beforeStream = create(String.format( + DiskBalancer.BEFORE_TEMPLATE, + cmd.getOptionValue(DiskBalancer.NODE)))) { + beforeStream.write(getCluster().toJson() + .getBytes(StandardCharsets.UTF_8)); + } + + try(FSDataOutputStream planStream = create(String.format( + DiskBalancer.PLAN_TEMPLATE, + cmd.getOptionValue(DiskBalancer.NODE)))) { + planStream.write(getPlan(plans).getBytes(StandardCharsets.UTF_8)); + } + + if (cmd.hasOption(DiskBalancer.VERBOSE)) { + printToScreen(plans); + } + } + + /** + * Gets extended help for this command. + * + * @return Help Message + */ + @Override + protected String getHelp() { + return "This commands creates a disk balancer plan for given datanode"; + } + + /** + * Get Threshold for planning purpose. + * + * @param cmd - Command Line Argument. + * @return double + */ + private double getThresholdPercentage(CommandLine cmd) { + Double value = 0.0; + if (cmd.hasOption(DiskBalancer.THRESHOLD)) { + value = Double.parseDouble(cmd.getOptionValue(DiskBalancer.THRESHOLD)); + } + + if ((value <= 0.0) || (value > 100.0)) { + value = getConf().getDouble( + DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THRUPUT, + DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THRUPUT_DEFAULT); + } + return value; + } + + /** + * Prints a quick summary of the plan to screen. + * + * @param plans - List of NodePlans. + */ + static private void printToScreen(List<NodePlan> plans) { + System.out.println("\nPlan :\n"); + System.out.println(StringUtils.repeat("=", 80)); + System.out.println("Source Disk\t\t Dest.Disk\t\t Move Size\t Type\n "); + for (NodePlan plan : plans) { + for (Step step : plan.getVolumeSetPlans()) { + System.out.println(String.format("%s\t%s\t%s\t%s", + step.getSourceVolume().getPath(), + step.getDestinationVolume().getPath(), + step.getSizeString(step.getBytesToMove()), + step.getDestinationVolume().getStorageType())); + } + } + + System.out.println(StringUtils.repeat("=", 80)); + } + + /** + * Sets user specified plan parameters. + * + * @param plans - list of plans. + */ + private void setPlanParams(List<NodePlan> plans) { + for (NodePlan plan : plans) { + for (Step step : plan.getVolumeSetPlans()) { + if (this.bandwidth > 0) { + step.setBandwidth(this.bandwidth); + } + if (this.maxError > 0) { + step.setMaxDiskErrors(this.maxError); + } + } + } + } + + /** + * Returns a Json represenation of the plans. + * + * @param plan - List of plans. + * @return String. + * @throws IOException + */ + private String getPlan(List<NodePlan> plan) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + return mapper.writeValueAsString(plan); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/75882ec0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/package-info.java new file mode 100644 index 0000000..e5494ef --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + Commands for disk balancer command line tool. + */ +package org.apache.hadoop.hdfs.server.diskbalancer.command; http://git-wip-us.apache.org/repos/asf/hadoop/blob/75882ec0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerCluster.java index 7b82278..17a6ebb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerCluster.java @@ -31,12 +31,13 @@ import org.codehaus.jackson.map.ObjectMapper; import java.io.File; import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Paths; import java.util.LinkedList; import java.util.List; +import java.util.Locale; import java.util.Set; import java.util.TreeSet; +import java.util.Map; +import java.util.HashMap; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -76,6 +77,13 @@ public class DiskBalancerCluster { @JsonIgnore private List<DiskBalancerDataNode> nodesToProcess; + @JsonIgnore + private final Map<String, DiskBalancerDataNode> ipList; + @JsonIgnore + private final Map<String, DiskBalancerDataNode> hostNames; + @JsonIgnore + private final Map<String, DiskBalancerDataNode> hostUUID; + private float threshold; /** @@ -85,7 +93,9 @@ public class DiskBalancerCluster { nodes = new LinkedList<>(); exclusionList = new TreeSet<>(); inclusionList = new TreeSet<>(); - + ipList = new HashMap<>(); + hostNames = new HashMap<>(); + hostUUID = new HashMap<>(); } /** @@ -95,10 +105,9 @@ public class DiskBalancerCluster { * @throws IOException */ public DiskBalancerCluster(ClusterConnector connector) throws IOException { + this(); Preconditions.checkNotNull(connector); clusterConnector = connector; - exclusionList = new TreeSet<>(); - inclusionList = new TreeSet<>(); } /** @@ -119,8 +128,25 @@ public class DiskBalancerCluster { */ public void readClusterInfo() throws Exception { Preconditions.checkNotNull(clusterConnector); - LOG.info("Using connector : " + clusterConnector.getConnectorInfo()); + LOG.debug("Using connector : {}" , clusterConnector.getConnectorInfo()); nodes = clusterConnector.getNodes(); + for(DiskBalancerDataNode node : nodes) { + + if(node.getDataNodeIP()!= null && !node.getDataNodeIP().isEmpty()) { + ipList.put(node.getDataNodeIP(), node); + } + + if(node.getDataNodeName() != null && !node.getDataNodeName().isEmpty()) { + // TODO : should we support Internationalized Domain Names ? + // Disk balancer assumes that host names are ascii. If not + // end user can always balance the node via IP address or DataNode UUID. + hostNames.put(node.getDataNodeName().toLowerCase(Locale.US), node); + } + + if(node.getDataNodeUUID() != null && !node.getDataNodeUUID().isEmpty()) { + hostUUID.put(node.getDataNodeUUID(), node); + } + } } /** @@ -259,30 +285,6 @@ public class DiskBalancerCluster { } /** - * Creates an Output directory for the cluster output. - * - * @throws IOException - On failure to create an new directory - */ - public void createOutPutDirectory() throws IOException { - if (Files.exists(Paths.get(this.getOutput()))) { - LOG.error("An output directory already exists at this location. Path : " + - this.getOutput()); - throw new IOException( - "An output directory already exists at this location. Path : " + - this.getOutput()); - } - - File f = new File(this.getOutput()); - if (!f.mkdirs()) { - LOG.error("Unable to create the output directory. Path : " + this - .getOutput()); - throw new IOException( - "Unable to create the output directory. Path : " + this.getOutput()); - } - LOG.info("Output directory created. Path : " + this.getOutput()); - } - - /** * Compute plan takes a node and constructs a planner that creates a plan that * we would like to follow. * <p/> @@ -294,7 +296,7 @@ public class DiskBalancerCluster { * @param thresholdPercent - in percentage * @return list of NodePlans */ - public List<NodePlan> computePlan(float thresholdPercent) { + public List<NodePlan> computePlan(double thresholdPercent) { List<NodePlan> planList = new LinkedList<>(); if (nodesToProcess == null) { @@ -366,11 +368,24 @@ public class DiskBalancerCluster { * @return DiskBalancerDataNode. */ public DiskBalancerDataNode getNodeByUUID(String uuid) { - for(DiskBalancerDataNode node : this.getNodes()) { - if(node.getDataNodeUUID().equals(uuid)) { - return node; - } - } - return null; + return hostUUID.get(uuid); + } + + /** + * Returns a node by IP Address. + * @param ipAddresss - IP address String. + * @return DiskBalancerDataNode. + */ + public DiskBalancerDataNode getNodeByIPAddress(String ipAddresss) { + return ipList.get(ipAddresss); + } + + /** + * Returns a node by hostName. + * @param hostName - HostName. + * @return DiskBalancerDataNode. + */ + public DiskBalancerDataNode getNodeByName(String hostName) { + return hostNames.get(hostName); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/75882ec0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerDataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerDataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerDataNode.java index f70a983..a200f4d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerDataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerDataNode.java @@ -220,7 +220,7 @@ public class DiskBalancerDataNode implements Comparable<DiskBalancerDataNode> { * @param threshold - Percentage * @return true or false */ - public boolean isBalancingNeeded(float threshold) { + public boolean isBalancingNeeded(double threshold) { for (DiskBalancerVolumeSet vSet : getVolumeSets().values()) { if (vSet.isBalancingNeeded(threshold)) { return true; http://git-wip-us.apache.org/repos/asf/hadoop/blob/75882ec0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolumeSet.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolumeSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolumeSet.java index 70d7536..97d8e28 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolumeSet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolumeSet.java @@ -267,7 +267,7 @@ public class DiskBalancerVolumeSet { * * @return true if balancing is needed false otherwise. */ - public boolean isBalancingNeeded(float thresholdPercentage) { + public boolean isBalancingNeeded(double thresholdPercentage) { double threshold = thresholdPercentage / 100.0d; if(volumes == null || volumes.size() <= 1) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/75882ec0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/GreedyPlanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/GreedyPlanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/GreedyPlanner.java index 88ddca4..b3d51c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/GreedyPlanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/GreedyPlanner.java @@ -44,7 +44,7 @@ public class GreedyPlanner implements Planner { public static final long TB = GB * 1024L; private static final Logger LOG = LoggerFactory.getLogger(GreedyPlanner.class); - private final float threshold; + private final double threshold; /** * Constructs a greedy planner. @@ -52,7 +52,7 @@ public class GreedyPlanner implements Planner { * @param threshold - Disk tolerance that we are ok with * @param node - node on which this planner is operating upon */ - public GreedyPlanner(float threshold, DiskBalancerDataNode node) { + public GreedyPlanner(double threshold, DiskBalancerDataNode node) { this.threshold = threshold; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/75882ec0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/PlannerFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/PlannerFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/PlannerFactory.java index 24f2970..3566438 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/PlannerFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/planner/PlannerFactory.java @@ -5,9 +5,9 @@ * licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * <p/> + * <p> * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -38,7 +38,7 @@ public final class PlannerFactory { * @return Planner */ public static Planner getPlanner(String plannerName, - DiskBalancerDataNode node, float threshold) { + DiskBalancerDataNode node, double threshold) { if (plannerName.equals(GREEDY_PLANNER)) { if (LOG.isDebugEnabled()) { String message = String http://git-wip-us.apache.org/repos/asf/hadoop/blob/75882ec0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancer.java new file mode 100644 index 0000000..e44d3dc --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancer.java @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdfs.tools; + +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.diskbalancer.command.Command; +import org.apache.hadoop.hdfs.server.diskbalancer.command.PlanCommand; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URISyntaxException; + +/** + * DiskBalancer is a tool that can be used to ensure that data is spread evenly + * across volumes of same storage type. + * <p> + * For example, if you have 3 disks, with 100 GB , 600 GB and 200 GB on each + * disk, this tool will ensure that each disk will have 300 GB. + * <p> + * This tool can be run while data nodes are fully functional. + * <p> + * At very high level diskbalancer computes a set of moves that will make disk + * utilization equal and then those moves are executed by the datanode. + */ +public class DiskBalancer extends Configured implements Tool { + /** + * Construct a DiskBalancer. + * + * @param conf + */ + public DiskBalancer(Configuration conf) { + super(conf); + } + + /** + * NameNodeURI can point to either a real namenode, or a json file that + * contains the diskBalancer data in json form, that jsonNodeConnector knows + * how to deserialize. + * <p> + * Expected formats are : + * <p> + * hdfs://namenode.uri or file:///data/myCluster.json + */ + public static final String NAMENODEURI = "uri"; + + /** + * Computes a plan for a given set of nodes. + */ + public static final String PLAN = "plan"; + + /** + * Output file name, for commands like report, plan etc. This is an optional + * argument, by default diskbalancer will write all its output to + * /system/reports/diskbalancer of the current cluster it is operating + * against. + */ + public static final String OUTFILE = "out"; + + /** + * Help for the program. + */ + public static final String HELP = "help"; + + /** + * Percentage of data unevenness that we are willing to live with. For example + * - a value like 10 indicates that we are okay with 10 % +/- from + * idealStorage Target. + */ + public static final String THRESHOLD = "thresholdPercentage"; + + /** + * Specifies the maximum disk bandwidth to use per second. + */ + public static final String BANDWIDTH = "bandwidth"; + + /** + * Specifies the maximum errors to tolerate. + */ + public static final String MAXERROR = "maxerror"; + + /** + * Node name or IP against which Disk Balancer is being run. + */ + public static final String NODE = "node"; + + /** + * Runs the command in verbose mode. + */ + public static final String VERBOSE = "v"; + + /** + * Template for the Before File. It is node.before.json. + */ + public static final String BEFORE_TEMPLATE = "%s.before.json"; + + /** + * Template for the plan file. it is node.plan.json. + */ + public static final String PLAN_TEMPLATE = "%s.plan.json"; + + private static final Logger LOG = + LoggerFactory.getLogger(DiskBalancer.class); + + /** + * Main for the DiskBalancer Command handling. + * + * @param argv - System Args Strings[] + * @throws Exception + */ + public static void main(String[] argv) throws Exception { + DiskBalancer shell = new DiskBalancer(new HdfsConfiguration()); + int res = 0; + try { + res = ToolRunner.run(shell, argv); + } catch (Exception ex) { + LOG.error(ex.toString()); + System.exit(1); + } + System.exit(res); + } + + /** + * Execute the command with the given arguments. + * + * @param args command specific arguments. + * @return exit code. + * @throws Exception + */ + @Override + public int run(String[] args) throws Exception { + Options opts = getOpts(); + CommandLine cmd = parseArgs(args, opts); + return dispatch(cmd, opts); + } + + /** + * returns the Command Line Options. + * + * @return Options + */ + private Options getOpts() { + Options opts = new Options(); + addCommands(opts); + return opts; + } + + /** + * Adds commands that we handle to opts. + * + * @param opt - Optins + */ + private void addCommands(Options opt) { + + Option nameNodeUri = + new Option(NAMENODEURI, true, "NameNode URI. e.g http://namenode" + + ".mycluster.com or file:///myCluster" + + ".json"); + opt.addOption(nameNodeUri); + + Option outFile = + new Option(OUTFILE, true, "File to write output to, if not specified " + + "defaults will be used." + + "e.g -out outfile.txt"); + opt.addOption(outFile); + + Option plan = new Option(PLAN, false, "write plan to the default file"); + opt.addOption(plan); + + Option bandwidth = new Option(BANDWIDTH, true, "Maximum disk bandwidth to" + + " be consumed by diskBalancer. " + + "Expressed as MBs per second."); + opt.addOption(bandwidth); + + Option threshold = new Option(THRESHOLD, true, "Percentage skew that we " + + "tolerate before diskbalancer starts working or stops when reaching " + + "that range."); + opt.addOption(threshold); + + Option maxErrors = new Option(MAXERROR, true, "Describes how many errors " + + "can be tolerated while copying between a pair of disks."); + opt.addOption(maxErrors); + + Option node = new Option(NODE, true, "Node Name or IP"); + opt.addOption(node); + + Option help = + new Option(HELP, true, "Help about a command or this message"); + opt.addOption(help); + + } + + /** + * This function parses all command line arguments and returns the appropriate + * values. + * + * @param argv - Argv from main + * @return CommandLine + */ + private CommandLine parseArgs(String[] argv, Options opts) + throws org.apache.commons.cli.ParseException { + BasicParser parser = new BasicParser(); + return parser.parse(opts, argv); + } + + /** + * Dispatches calls to the right command Handler classes. + * + * @param cmd - CommandLine + * @throws IOException + * @throws URISyntaxException + */ + private int dispatch(CommandLine cmd, Options opts) + throws IOException, URISyntaxException { + Command currentCommand = null; + + try { + if (cmd.hasOption(DiskBalancer.PLAN)) { + currentCommand = new PlanCommand(getConf()); + } else { + HelpFormatter helpFormatter = new HelpFormatter(); + helpFormatter.printHelp(80, "hdfs diskbalancer -uri [args]", + "disk balancer commands", opts, + "Please correct your command and try again."); + return 1; + } + + currentCommand.execute(cmd); + + } catch (Exception ex) { + System.err.printf(ex.getMessage()); + return 1; + } + return 0; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/75882ec0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java index 5032611..1cc90e5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java @@ -50,8 +50,6 @@ import org.junit.rules.ExpectedException; import java.io.IOException; import java.net.URI; import java.util.Iterator; -import java.util.List; -import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.NO_PLAN; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org