Repository: hadoop Updated Branches: refs/heads/HDFS-1312 a65c707c0 -> f9fa01362
HDFS-9546: DiskBalancer: Add Execute command. Contributed by Anu Engineer. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f9fa0136 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f9fa0136 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f9fa0136 Branch: refs/heads/HDFS-1312 Commit: f9fa0136240d0559408322d0f03ee470ba8e20db Parents: a65c707 Author: Anu Engineer <aengin...@apache.org> Authored: Fri May 13 10:52:58 2016 -0700 Committer: Anu Engineer <aengin...@apache.org> Committed: Fri May 13 10:52:58 2016 -0700 ---------------------------------------------------------------------- .../server/diskbalancer/command/Command.java | 18 ++- .../diskbalancer/command/ExecuteCommand.java | 119 +++++++++++++++++++ .../diskbalancer/command/PlanCommand.java | 22 ++-- .../apache/hadoop/hdfs/tools/DiskBalancer.java | 71 ++++++----- 4 files changed, 187 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9fa0136/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java index 6522434..919d549 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java @@ -24,6 +24,7 @@ import org.apache.commons.cli.Option; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -81,8 +82,6 @@ public abstract class Command extends Configured { public Command(Configuration conf) { super(conf); // These arguments are valid for all commands. - addValidCommandParameters(DiskBalancer.NAMENODEURI, "Name Node URI or " + - "file URI for cluster"); addValidCommandParameters(DiskBalancer.HELP, "Help for this command"); addValidCommandParameters("arg", ""); } @@ -348,10 +347,25 @@ public abstract class Command extends Configured { * @return OutputStream. */ protected FSDataOutputStream create(String fileName) throws IOException { + Preconditions.checkNotNull(fileName); + if(fs == null) { + fs = FileSystem.get(getConf()); + } return fs.create(new Path(this.diskBalancerLogs, fileName)); } /** + * Returns a InputStream to read data. + */ + protected FSDataInputStream open(String fileName) throws IOException { + Preconditions.checkNotNull(fileName); + if(fs == null) { + fs = FileSystem.get(getConf()); + } + return fs.open(new Path(fileName)); + } + + /** * Returns the output path where the plan and snapshot gets written. * * @return Path http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9fa0136/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ExecuteCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ExecuteCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ExecuteCommand.java new file mode 100644 index 0000000..1f7e81f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ExecuteCommand.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.hdfs.server.diskbalancer.command; + +import com.google.common.base.Preconditions; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; +import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException; +import org.apache.hadoop.hdfs.tools.DiskBalancer; +import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan; +import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; + + +/** + * executes a given plan. + */ +public class ExecuteCommand extends Command { + + /** + * Constructs ExecuteCommand. + * + * @param conf - Configuration. + */ + public ExecuteCommand(Configuration conf) { + super(conf); + addValidCommandParameters(DiskBalancer.EXECUTE, "Executes a given plan."); + addValidCommandParameters(DiskBalancer.NODE, "Name of the target node."); + } + + /** + * Executes the Client Calls. + * + * @param cmd - CommandLine + */ + @Override + public void execute(CommandLine cmd) throws Exception { + LOG.info("Executing \"execute plan\" command"); + Preconditions.checkState(cmd.hasOption(DiskBalancer.EXECUTE)); + verifyCommandOptions(DiskBalancer.EXECUTE, cmd); + + String planFile = cmd.getOptionValue(DiskBalancer.EXECUTE); + Preconditions.checkArgument(planFile == null || planFile.isEmpty(), + "Invalid plan file specified."); + + String planData = null; + try (FSDataInputStream plan = open(planFile)) { + planData = IOUtils.toString(plan); + } + submitPlan(planData); + } + + /** + * Submits plan to a given data node. + * + * @param planData - PlanData Json String. + * @throws IOException + */ + private void submitPlan(String planData) throws IOException { + Preconditions.checkNotNull(planData); + NodePlan plan = readPlan(planData); + String dataNodeAddress = plan.getNodeName() + ":" + plan.getPort(); + Preconditions.checkNotNull(dataNodeAddress); + ClientDatanodeProtocol dataNode = getDataNodeProxy(dataNodeAddress); + String planHash = DigestUtils.sha512Hex(planData); + try { + dataNode.submitDiskBalancerPlan(planHash, DiskBalancer.PLAN_VERSION, + planData, false); // TODO : Support skipping date check. + } catch (DiskBalancerException ex) { + LOG.error("Submitting plan on {} failed. Result: {}, Message: {}", + plan.getNodeName(), ex.getResult().toString(), ex.getMessage()); + throw ex; + } + } + + /** + * Returns a plan from the Json Data. + * + * @param planData - Json String + * @return NodePlan + * @throws IOException + */ + private NodePlan readPlan(String planData) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(planData, NodePlan.class); + } + + /** + * Gets extended help for this command. + * + * @return Help Message + */ + @Override + protected String getHelp() { + return "Execute command takes a plan and runs it against the node. e.g. " + + "hdfs diskbalancer -execute <nodename.plan.json> "; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9fa0136/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/PlanCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/PlanCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/PlanCommand.java index 2422215..d346c84 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/PlanCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/PlanCommand.java @@ -24,7 +24,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.tools.DiskBalancer; -import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode; +import org.apache.hadoop.hdfs.server.diskbalancer.datamodel + .DiskBalancerDataNode; import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan; import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step; import org.codehaus.jackson.map.ObjectMapper; @@ -54,6 +55,9 @@ public class PlanCommand extends Command { this.thresholdPercentage = 1; this.bandwidth = 0; this.maxError = 0; + addValidCommandParameters(DiskBalancer.NAMENODEURI, "Name Node URI or " + + "file URI for cluster"); + addValidCommandParameters(DiskBalancer.OUTFILE, "Output file"); addValidCommandParameters(DiskBalancer.BANDWIDTH, "Maximum Bandwidth to " + "be used while copying."); @@ -61,7 +65,6 @@ public class PlanCommand extends Command { "we tolerate before diskbalancer starts working."); addValidCommandParameters(DiskBalancer.MAXERROR, "Max errors to tolerate " + "between 2 disks"); - addValidCommandParameters(DiskBalancer.NODE, "Name / Address of the node."); addValidCommandParameters(DiskBalancer.VERBOSE, "Run plan command in " + "verbose mode."); } @@ -79,7 +82,7 @@ public class PlanCommand extends Command { Preconditions.checkState(cmd.hasOption(DiskBalancer.PLAN)); verifyCommandOptions(DiskBalancer.PLAN, cmd); - if (!cmd.hasOption(DiskBalancer.NODE)) { + if (cmd.getOptionValue(DiskBalancer.PLAN) == null) { throw new IllegalArgumentException("A node name is required to create a" + " plan."); } @@ -101,10 +104,11 @@ public class PlanCommand extends Command { } setOutputPath(output); - DiskBalancerDataNode node = getNode(cmd.getOptionValue(DiskBalancer.NODE)); + // -plan nodename is the command line argument. + DiskBalancerDataNode node = getNode(cmd.getOptionValue(DiskBalancer.PLAN)); if (node == null) { throw new IllegalArgumentException("Unable to find the specified node. " + - cmd.getOptionValue(DiskBalancer.NODE)); + cmd.getOptionValue(DiskBalancer.PLAN)); } this.thresholdPercentage = getThresholdPercentage(cmd); setNodesToProcess(node); @@ -115,16 +119,16 @@ public class PlanCommand extends Command { LOG.info("Writing plan to : {}", getOutputPath()); System.out.printf("Writing plan to : %s%n", getOutputPath()); - try(FSDataOutputStream beforeStream = create(String.format( + try (FSDataOutputStream beforeStream = create(String.format( DiskBalancer.BEFORE_TEMPLATE, - cmd.getOptionValue(DiskBalancer.NODE)))) { + cmd.getOptionValue(DiskBalancer.PLAN)))) { beforeStream.write(getCluster().toJson() .getBytes(StandardCharsets.UTF_8)); } - try(FSDataOutputStream planStream = create(String.format( + try (FSDataOutputStream planStream = create(String.format( DiskBalancer.PLAN_TEMPLATE, - cmd.getOptionValue(DiskBalancer.NODE)))) { + cmd.getOptionValue(DiskBalancer.PLAN)))) { planStream.write(getPlan(plans).getBytes(StandardCharsets.UTF_8)); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9fa0136/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancer.java index e44d3dc..87fbf4a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancer.java @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.server.diskbalancer.command.Command; +import org.apache.hadoop.hdfs.server.diskbalancer.command.ExecuteCommand; import org.apache.hadoop.hdfs.server.diskbalancer.command.PlanCommand; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -48,15 +49,6 @@ import java.net.URISyntaxException; */ public class DiskBalancer extends Configured implements Tool { /** - * Construct a DiskBalancer. - * - * @param conf - */ - public DiskBalancer(Configuration conf) { - super(conf); - } - - /** * NameNodeURI can point to either a real namenode, or a json file that * contains the diskBalancer data in json form, that jsonNodeConnector knows * how to deserialize. @@ -66,12 +58,10 @@ public class DiskBalancer extends Configured implements Tool { * hdfs://namenode.uri or file:///data/myCluster.json */ public static final String NAMENODEURI = "uri"; - /** * Computes a plan for a given set of nodes. */ public static final String PLAN = "plan"; - /** * Output file name, for commands like report, plan etc. This is an optional * argument, by default diskbalancer will write all its output to @@ -79,53 +69,58 @@ public class DiskBalancer extends Configured implements Tool { * against. */ public static final String OUTFILE = "out"; - /** * Help for the program. */ public static final String HELP = "help"; - /** * Percentage of data unevenness that we are willing to live with. For example * - a value like 10 indicates that we are okay with 10 % +/- from * idealStorage Target. */ public static final String THRESHOLD = "thresholdPercentage"; - /** * Specifies the maximum disk bandwidth to use per second. */ public static final String BANDWIDTH = "bandwidth"; - /** * Specifies the maximum errors to tolerate. */ public static final String MAXERROR = "maxerror"; - /** - * Node name or IP against which Disk Balancer is being run. + * Executes a given plan file on the target datanode. + */ + public static final String EXECUTE = "execute"; + /** + * Name or address of the node to execute against. */ public static final String NODE = "node"; - /** * Runs the command in verbose mode. */ public static final String VERBOSE = "v"; - + public static final int PLAN_VERSION = 1; /** * Template for the Before File. It is node.before.json. */ public static final String BEFORE_TEMPLATE = "%s.before.json"; - /** * Template for the plan file. it is node.plan.json. */ public static final String PLAN_TEMPLATE = "%s.plan.json"; - private static final Logger LOG = LoggerFactory.getLogger(DiskBalancer.class); /** + * Construct a DiskBalancer. + * + * @param conf + */ + public DiskBalancer(Configuration conf) { + super(conf); + } + + /** * Main for the DiskBalancer Command handling. * * @param argv - System Args Strings[] @@ -164,16 +159,16 @@ public class DiskBalancer extends Configured implements Tool { */ private Options getOpts() { Options opts = new Options(); - addCommands(opts); + addPlanCommands(opts); return opts; } /** - * Adds commands that we handle to opts. + * Adds commands for plan command. * - * @param opt - Optins + * @param opt - Options */ - private void addCommands(Options opt) { + private void addPlanCommands(Options opt) { Option nameNodeUri = new Option(NAMENODEURI, true, "NameNode URI. e.g http://namenode" + @@ -187,7 +182,8 @@ public class DiskBalancer extends Configured implements Tool { "e.g -out outfile.txt"); opt.addOption(outFile); - Option plan = new Option(PLAN, false, "write plan to the default file"); + Option plan = new Option(PLAN, true , "create a plan for the given node. " + + "e.g -plan <nodename> | <nodeIP> | <nodeUUID>"); opt.addOption(plan); Option bandwidth = new Option(BANDWIDTH, true, "Maximum disk bandwidth to" + @@ -204,13 +200,19 @@ public class DiskBalancer extends Configured implements Tool { "can be tolerated while copying between a pair of disks."); opt.addOption(maxErrors); - Option node = new Option(NODE, true, "Node Name or IP"); - opt.addOption(node); - Option help = new Option(HELP, true, "Help about a command or this message"); opt.addOption(help); + } + /** + * Adds execute command options. + * @param opt Options + */ + private void addExecuteCommands(Options opt) { + Option execute = new Option(EXECUTE, true , "Takes a plan file and " + + "submits it for execution to the datanode. e.g -execute <planfile>"); + opt.addOption(execute); } /** @@ -238,18 +240,23 @@ public class DiskBalancer extends Configured implements Tool { Command currentCommand = null; try { + if (cmd.hasOption(DiskBalancer.PLAN)) { currentCommand = new PlanCommand(getConf()); - } else { + } + + if(cmd.hasOption(DiskBalancer.EXECUTE)) { + currentCommand = new ExecuteCommand(getConf()); + } + + if(currentCommand == null) { HelpFormatter helpFormatter = new HelpFormatter(); helpFormatter.printHelp(80, "hdfs diskbalancer -uri [args]", "disk balancer commands", opts, "Please correct your command and try again."); return 1; } - currentCommand.execute(cmd); - } catch (Exception ex) { System.err.printf(ex.getMessage()); return 1; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org