Repository: storm Updated Branches: refs/heads/master a759db38d -> 4a9278630
STROM-1263: port backtype.storm.command.kill-topology to java (And add in better java CLI) Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b05aeb0e Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b05aeb0e Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b05aeb0e Branch: refs/heads/master Commit: b05aeb0eaadde8c919428bb2dbbffaa414b8470d Parents: 265ff91 Author: Robert (Bobby) Evans <[email protected]> Authored: Fri Feb 12 12:38:48 2016 -0600 Committer: Robert (Bobby) Evans <[email protected]> Committed: Fri Feb 12 12:38:48 2016 -0600 ---------------------------------------------------------------------- bin/storm.cmd | 14 +- bin/storm.py | 2 +- pom.xml | 6 + storm-core/pom.xml | 9 + .../org/apache/storm/command/kill_topology.clj | 29 --- .../src/jvm/org/apache/storm/command/CLI.java | 229 +++++++++++++++++++ .../org/apache/storm/command/KillTopology.java | 51 +++++ .../org/apache/storm/utils/NimbusClient.java | 19 +- 8 files changed, 321 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/b05aeb0e/bin/storm.cmd ---------------------------------------------------------------------- diff --git a/bin/storm.cmd b/bin/storm.cmd index 6f4e934..8b3fa92 100644 --- a/bin/storm.cmd +++ b/bin/storm.cmd @@ -145,7 +145,7 @@ :drpc set CLASS=org.apache.storm.daemon.drpc - "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" org.apache.storm.command.config_value drpc.childopts > %CMD_TEMP_FILE% + "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" org.apache.storm.command.ConfigValue drpc.childopts > %CMD_TEMP_FILE% FOR /F "delims=" %%i in (%CMD_TEMP_FILE%) do ( FOR /F "tokens=1,* delims= " %%a in ("%%i") do ( if %%a == VALUE: ( @@ -160,7 +160,7 @@ goto :eof :kill - set CLASS=org.apache.storm.command.kill_topology + set CLASS=org.apache.storm.command.KillTopology set STORM_OPTS=%STORM_CLIENT_OPTS% %STORM_OPTS% goto :eof @@ -171,7 +171,7 @@ :logviewer set CLASS=org.apache.storm.daemon.logviewer - "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" org.apache.storm.command.config_value logviewer.childopts > %CMD_TEMP_FILE% + "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" org.apache.storm.command.ConfigValue logviewer.childopts > %CMD_TEMP_FILE% FOR /F "delims=" %%i in (%CMD_TEMP_FILE%) do ( FOR /F "tokens=1,* delims= " %%a in ("%%i") do ( if %%a == VALUE: ( @@ -183,7 +183,7 @@ :nimbus set CLASS=org.apache.storm.daemon.nimbus - "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" org.apache.storm.command.config_value nimbus.childopts > %CMD_TEMP_FILE% + "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" org.apache.storm.command.ConfigValue nimbus.childopts > %CMD_TEMP_FILE% FOR /F "delims=" %%i in (%CMD_TEMP_FILE%) do ( FOR /F "tokens=1,* delims= " %%a in ("%%i") do ( if %%a == VALUE: ( @@ -199,7 +199,7 @@ goto :eof :remoteconfvalue - set CLASS=org.apache.storm.command.config_value + set CLASS=org.apache.storm.command.ConfigValue set STORM_OPTS=%STORM_CLIENT_OPTS% %STORM_OPTS% goto :eof @@ -215,7 +215,7 @@ :supervisor set CLASS=org.apache.storm.daemon.supervisor - "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" org.apache.storm.command.config_value supervisor.childopts > %CMD_TEMP_FILE% + "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" org.apache.storm.command.ConfigValue supervisor.childopts > %CMD_TEMP_FILE% FOR /F "delims=" %%i in (%CMD_TEMP_FILE%) do ( FOR /F "tokens=1,* delims= " %%a in ("%%i") do ( if %%a == VALUE: ( @@ -228,7 +228,7 @@ :ui set CLASS=org.apache.storm.ui.core set CLASSPATH=%CLASSPATH%;%STORM_HOME% - "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" org.apache.storm.command.config_value ui.childopts > %CMD_TEMP_FILE% + "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" org.apache.storm.command.ConfigValue ui.childopts > %CMD_TEMP_FILE% FOR /F "delims=" %%i in (%CMD_TEMP_FILE%) do ( FOR /F "tokens=1,* delims= " %%a in ("%%i") do ( if %%a == VALUE: ( http://git-wip-us.apache.org/repos/asf/storm/blob/b05aeb0e/bin/storm.py ---------------------------------------------------------------------- diff --git a/bin/storm.py b/bin/storm.py index f2aca95..48160cc 100755 --- a/bin/storm.py +++ b/bin/storm.py @@ -278,7 +278,7 @@ def kill(*args): print_usage(command="kill") sys.exit(2) exec_storm_class( - "org.apache.storm.command.kill_topology", + "org.apache.storm.command.KillTopology", args=args, jvmtype="-client", extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) http://git-wip-us.apache.org/repos/asf/storm/blob/b05aeb0e/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 783018f..61a1ed9 100644 --- a/pom.xml +++ b/pom.xml @@ -199,6 +199,7 @@ <commons-exec.version>1.1</commons-exec.version> <commons-fileupload.version>1.2.1</commons-fileupload.version> <commons-codec.version>1.6</commons-codec.version> + <commons-cli.version>1.3.1</commons-cli.version> <clj-time.version>0.8.0</clj-time.version> <curator.version>2.9.0</curator.version> <json-simple.version>1.1</json-simple.version> @@ -492,6 +493,11 @@ <version>${kryo.version}</version> </dependency> <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + <version>${commons-cli.version}</version> + </dependency> + <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>${commons-io.version}</version> http://git-wip-us.apache.org/repos/asf/storm/blob/b05aeb0e/storm-core/pom.xml ---------------------------------------------------------------------- diff --git a/storm-core/pom.xml b/storm-core/pom.xml index 247d097..624e340 100644 --- a/storm-core/pom.xml +++ b/storm-core/pom.xml @@ -149,6 +149,10 @@ <!--java--> <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + </dependency> + <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <scope>compile</scope> @@ -505,6 +509,7 @@ <include>org.apache.commons:commons-exec</include> <include>org.apache.commons:commons-compress</include> <include>org.apache.hadoop:hadoop-auth</include> + <include>commons-cli:commons-cli</include> <include>commons-io:commons-io</include> <include>commons-codec:commons-codec</include> <include>commons-fileupload:commons-fileupload</include> @@ -643,6 +648,10 @@ <shadedPattern>org.apache.storm.shade.com.metamx.http.client</shadedPattern> </relocation> <relocation> + <pattern>org.apache.commons.cli</pattern> + <shadedPattern>org.apache.storm.shade.org.apache.commons.cli</shadedPattern> + </relocation> + <relocation> <pattern>org.apache.commons.io</pattern> <shadedPattern>org.apache.storm.shade.org.apache.commons.io</shadedPattern> </relocation> http://git-wip-us.apache.org/repos/asf/storm/blob/b05aeb0e/storm-core/src/clj/org/apache/storm/command/kill_topology.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/kill_topology.clj b/storm-core/src/clj/org/apache/storm/command/kill_topology.clj deleted file mode 100644 index 84e0a64..0000000 --- a/storm-core/src/clj/org/apache/storm/command/kill_topology.clj +++ /dev/null @@ -1,29 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns org.apache.storm.command.kill-topology - (:use [clojure.tools.cli :only [cli]]) - (:use [org.apache.storm thrift config log]) - (:import [org.apache.storm.generated KillOptions]) - (:gen-class)) - -(defn -main [& args] - (let [[{wait :wait} [name] _] (cli args ["-w" "--wait" :default nil :parse-fn #(Integer/parseInt %)]) - opts (KillOptions.)] - (if wait (.set_wait_secs opts wait)) - (with-configured-nimbus-connection nimbus - (.killTopologyWithOpts nimbus name opts) - (log-message "Killed topology: " name) - ))) http://git-wip-us.apache.org/repos/asf/storm/blob/b05aeb0e/storm-core/src/jvm/org/apache/storm/command/CLI.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/command/CLI.java b/storm-core/src/jvm/org/apache/storm/command/CLI.java new file mode 100644 index 0000000..9813a3e --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/command/CLI.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.command; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.List; + +import org.apache.commons.cli.*; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CLI { + private static final Logger LOG = LoggerFactory.getLogger(CLI.class); + private static class Opt { + final String s; + final String l; + final Object defaultValue; + final Parse parse; + final Assoc assoc; + public Opt(String s, String l, Object defaultValue, Parse parse, Assoc assoc) { + this.s = s; + this.l = l; + this.defaultValue = defaultValue; + this.parse = parse == null ? AS_STRING : parse; + this.assoc = assoc == null ? LAST_WINS : assoc; + } + + public Object process(Object current, String value) { + return assoc.assoc(current, parse.parse(value)); + } + } + + private static class Arg { + final String name; + final Parse parse; + final Assoc assoc; + public Arg(String name, Parse parse, Assoc assoc) { + this.name = name; + this.parse = parse == null ? AS_STRING : parse; + this.assoc = assoc == null ? INTO_LIST : assoc; + } + + public Object process(Object current, String value) { + return assoc.assoc(current, parse.parse(value)); + } + } + + public interface Parse { + /** + * Parse a String to the type you want it to be. + * @param value the String to parse + * @return the parsed value + */ + public Object parse(String value); + } + + public static final Parse AS_INT = new Parse() { + @Override + public Object parse(String value) { + return Integer.valueOf(value); + } + }; + + public static final Parse AS_STRING = new Parse() { + @Override + public Object parse(String value) { + return value; + } + }; + + public interface Assoc { + /** + * Associate a value into somthing else + * @param current what to put value into, will be null if no values have been added yet. + * @param value what to add + * @return the result of combining the two + */ + public Object assoc(Object current, Object value); + } + + public static final Assoc LAST_WINS = new Assoc() { + @Override + public Object assoc(Object current, Object value) { + return value; + } + }; + + public static final Assoc FIRST_WINS = new Assoc() { + @Override + public Object assoc(Object current, Object value) { + return current == null ? value : current; + } + }; + + public static final Assoc INTO_LIST = new Assoc() { + @Override + public Object assoc(Object current, Object value) { + if (current == null) { + current = new ArrayList<Object>(); + } + ((List<Object>)current).add(value); + return current; + } + }; + + public static class CLIBuilder { + private final ArrayList<Opt> opts = new ArrayList<>(); + private final ArrayList<Arg> args = new ArrayList<>(); + + public CLIBuilder opt(String s, String l, Object defaultValue) { + return opt(s, l, defaultValue, null, null); + } + + public CLIBuilder opt(String s, String l, Object defaultValue, Parse parse) { + return opt(s, l, defaultValue, parse, null); + } + + public CLIBuilder opt(String s, String l, Object defaultValue, Parse parse, Assoc assoc) { + opts.add(new Opt(s, l, defaultValue, parse, assoc)); + return this; + } + + public CLIBuilder arg(String name) { + return arg(name, null, null); + } + + public CLIBuilder arg(String name, Assoc assoc) { + return arg(name, null, assoc); + } + + public CLIBuilder arg(String name, Parse parse) { + return arg(name, parse, null); + } + + public CLIBuilder arg(String name, Parse parse, Assoc assoc) { + args.add(new Arg(name, parse, assoc)); + return this; + } + + public Map<String, Object> parse(String[] rawArgs) throws Exception { + Options options = new Options(); + for (Opt opt: opts) { + options.addOption(Option.builder(opt.s).longOpt(opt.l).hasArg().build()); + } + DefaultParser parser = new DefaultParser(); + CommandLine cl = parser.parse(options, rawArgs); + HashMap<String, Object> ret = new HashMap<>(); + for (Opt opt: opts) { + Object current = null; + for (String val: cl.getOptionValues(opt.s)) { + current = opt.process(current, val); + } + if (current == null) { + current = opt.defaultValue; + } + ret.put(opt.s, current); + } + List<String> stringArgs = cl.getArgList(); + if (args.size() > stringArgs.size()) { + throw new RuntimeException("Wrong number of arguments at least "+args.size()+" expected, but only " + stringArgs.size() + " found"); + } + + int argIndex = 0; + int stringArgIndex = 0; + if (args.size() > 0) { + while (argIndex < args.size()) { + Arg arg = args.get(argIndex); + boolean isLastArg = (argIndex == (args.size() - 1)); + Object current = null; + int maxStringIndex = isLastArg ? stringArgs.size() : (stringArgIndex + 1); + for (;stringArgIndex < maxStringIndex; stringArgIndex++) { + current = arg.process(current, stringArgs.get(stringArgIndex)); + } + ret.put(arg.name, current); + argIndex++; + } + } else { + ret.put("ARGS", stringArgs); + } + return ret; + } + } + + public static CLIBuilder opt(String s, String l, Object defaultValue) { + return new CLIBuilder().opt(s, l, defaultValue); + } + + public static CLIBuilder opt(String s, String l, Object defaultValue, Parse parse) { + return new CLIBuilder().opt(s, l, defaultValue, parse); + } + + public static CLIBuilder opt(String s, String l, Object defaultValue, Parse parse, Assoc assoc) { + return new CLIBuilder().opt(s, l, defaultValue, parse, assoc); + } + + public CLIBuilder arg(String name) { + return new CLIBuilder().arg(name); + } + + public CLIBuilder arg(String name, Assoc assoc) { + return new CLIBuilder().arg(name, assoc); + } + + public CLIBuilder arg(String name, Parse parse) { + return new CLIBuilder().arg(name, parse); + } + + public CLIBuilder arg(String name, Parse parse, Assoc assoc) { + return new CLIBuilder().arg(name, parse, assoc); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/b05aeb0e/storm-core/src/jvm/org/apache/storm/command/KillTopology.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/command/KillTopology.java b/storm-core/src/jvm/org/apache/storm/command/KillTopology.java new file mode 100644 index 0000000..8f4d323 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/command/KillTopology.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.command; + +import java.util.Map; + +import org.apache.storm.generated.KillOptions; +import org.apache.storm.generated.Nimbus; +import org.apache.storm.utils.NimbusClient; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KillTopology { + private static final Logger LOG = LoggerFactory.getLogger(KillTopology.class); + + public static void main(String [] args) throws Exception { + Map<String, Object> cl = CLI.opt("w", "wait", null, CLI.AS_INT) + .arg("TOPO", CLI.FIRST_WINS) + .parse(args); + final String name = (String)cl.get("TOPO"); + Integer wait = (Integer)cl.get("w"); + + final KillOptions opts = new KillOptions(); + if (wait != null) { + opts.set_wait_secs(wait); + } + NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() { + @Override + public void run(Nimbus.Client nimbus) throws Exception { + nimbus.killTopologyWithOpts(name, opts); + LOG.info("Killed topology: {}", name); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/b05aeb0e/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java b/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java index f5bad6e..4c76b29 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java +++ b/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java @@ -17,11 +17,11 @@ */ package org.apache.storm.utils; - import org.apache.storm.Config; import org.apache.storm.generated.ClusterSummary; import org.apache.storm.generated.Nimbus; import org.apache.storm.generated.NimbusSummary; +import org.apache.storm.security.auth.ReqContext; import org.apache.storm.security.auth.ThriftClient; import org.apache.storm.security.auth.ThriftConnectionType; import com.google.common.collect.Lists; @@ -29,6 +29,7 @@ import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.security.Principal; import java.util.List; import java.util.Map; @@ -36,6 +37,22 @@ public class NimbusClient extends ThriftClient implements AutoCloseable { private Nimbus.Client _client; private static final Logger LOG = LoggerFactory.getLogger(NimbusClient.class); + public interface WithNimbus { + public void run(Nimbus.Client client) throws Exception; + } + + public static void withConfiguredClient(WithNimbus cb) throws Exception { + withConfiguredClient(cb, ConfigUtils.readStormConfig()); + } + + public static void withConfiguredClient(WithNimbus cb, Map conf) throws Exception { + ReqContext context = ReqContext.context(); + Principal principal = context.principal(); + String user = principal == null ? null : principal.getName(); + try (NimbusClient client = getConfiguredClientAs(conf, user);) { + cb.run(client.getClient()); + } + } public static NimbusClient getConfiguredClient(Map conf) { return getConfiguredClientAs(conf, null);
