Github user govind-menon commented on a diff in the pull request:
https://github.com/apache/storm/pull/2930#discussion_r247197760
--- Diff: bin/storm.py ---
@@ -296,787 +239,1044 @@ def exec_storm_class(klass, jvmtype="-server",
jvmopts=[], extrajars=[], args=[]
elif is_windows():
# handling whitespaces in JAVA_CMD
try:
- ret = sub.check_output(all_args, stderr=sub.STDOUT)
+ ret = subprocess.check_output(all_args,
stderr=subprocess.STDOUT)
print(ret)
- except sub.CalledProcessError as e:
+ except subprocess.CalledProcessError as e:
print(e.output)
sys.exit(e.returncode)
else:
os.execvp(JAVA_CMD, all_args)
return exit_code
-def run_client_jar(jarfile, klass, args, daemon=False, client=True,
extrajvmopts=[]):
- global DEP_JARS_OPTS, DEP_ARTIFACTS_OPTS,
DEP_ARTIFACTS_REPOSITORIES_OPTS, DEP_MAVEN_LOCAL_REPOSITORY_DIRECTORY,
DEP_PROXY_URL, DEP_PROXY_USERNAME, DEP_PROXY_PASSWORD
- local_jars = DEP_JARS_OPTS
- artifact_to_file_jars = resolve_dependencies(DEP_ARTIFACTS_OPTS,
DEP_ARTIFACTS_REPOSITORIES_OPTS, DEP_MAVEN_LOCAL_REPOSITORY_DIRECTORY,
DEP_PROXY_URL, DEP_PROXY_USERNAME, DEP_PROXY_PASSWORD)
+def run_client_jar(klass, args, daemon=False, client=True,
extrajvmopts=[]):
+ local_jars = args.jars.split(",")
+ jarfile = args.topology_jar_path
+
+ artifact_to_file_jars = resolve_dependencies(
+ args.artifacts, args.artifactRepositories,
+ args.mavenLocalRepositoryDirectory, args.proxyUrl,
+ args.proxyUsername, args.proxyPassword
+ )
- extra_jars=[jarfile, USER_CONF_DIR, STORM_BIN_DIR]
+ extra_jars = [jarfile, USER_CONF_DIR, STORM_BIN_DIR]
extra_jars.extend(local_jars)
extra_jars.extend(artifact_to_file_jars.values())
exec_storm_class(
- klass,
+ klass, args.c,
jvmtype="-client",
extrajars=extra_jars,
- args=args,
+ args=args.topology_main_args,
daemon=False,
jvmopts=JAR_JVM_OPTS + extrajvmopts + ["-Dstorm.jar=" + jarfile] +
["-Dstorm.dependency.jars=" + ",".join(local_jars)] +
["-Dstorm.dependency.artifacts=" +
json.dumps(artifact_to_file_jars)])
-def local(jarfile, klass, *args):
- """Syntax: [storm local topology-jar-path class ...]
- Runs the main method of class with the specified arguments but
pointing to a local cluster
- The storm jars and configs in ~/.storm are put on the classpath.
- The process is configured so that StormSubmitter
-
(http://storm.apache.org/releases/current/javadocs/org/apache/storm/StormSubmitter.html)
- and others will interact with a local cluster instead of the one
configured by default.
+def print_localconfvalue(args):
+ print(args.conf_name + ": " + confvalue(args.conf_name, args.c,
[USER_CONF_DIR]))
+
+
+def print_remoteconfvalue(args):
+ print(args.conf_name + ": " + confvalue(args.conf_name, args.c,
[CLUSTER_CONF_DIR]))
+
+
+def initialize_main_command():
+ main_parser = argparse.ArgumentParser(prog="storm")
+ main_parser.add_argument("--config", default=None, help="Override
default storm conf")
+ main_parser.add_argument("-c", action="append", default=[],
help="Override storm conf properties")
+
+ subparsers = main_parser.add_subparsers(help="")
+
+ initialize_jar_subcommand(subparsers)
+ initialize_localconfvalue_subcommand(subparsers)
+ initialize_remoteconfvalue_subcommand(subparsers)
+ initialize_local_subcommand(subparsers)
+ initialize_sql_subcommand(subparsers)
+ initialize_kill_subcommand(subparsers)
+ initialize_upload_credentials_subcommand(subparsers)
+ initialize_blobstore_subcommand(subparsers)
+ initialize_heartbeats_subcommand(subparsers)
+ initialize_activate_subcommand(subparsers)
+ initialize_listtopos_subcommand(subparsers)
+ initialize_deactivate_subcommand(subparsers)
+ initialize_rebalance_subcommand(subparsers)
+ initialize_get_errors_subcommand(subparsers)
+ initialize_healthcheck_subcommand(subparsers)
+ initialize_kill_workers_subcommand(subparsers)
+ initialize_admin_subcommand(subparsers)
+ initialize_shell_subcommand(subparsers)
+ initialize_repl_subcommand(subparsers)
+ initialize_nimbus_subcommand(subparsers)
+ initialize_pacemaker_subcommand(subparsers)
+ initialize_supervisor_subcommand(subparsers)
+ initialize_ui_subcommand(subparsers)
+ initialize_logviewer_subcommand(subparsers)
+ initialize_drpc_client_subcommand(subparsers)
+ initialize_drpc_subcommand(subparsers)
+ initialize_dev_zookeeper_subcommand(subparsers)
+ initialize_version_subcommand(subparsers)
+ initialize_classpath_subcommand(subparsers)
+ initialize_server_classpath_subcommand(subparsers)
+ initialize_monitor_subcommand(subparsers)
+
+ return main_parser
+
+
+def initialize_localconfvalue_subcommand(subparsers):
+ command_help = '''Prints out the value for conf-name in the local
Storm configs.
+ The local Storm configs are the ones in ~/.storm/storm.yaml merged
+ in with the configs in defaults.yaml.'''
- Most options should work just like with the storm jar command.
+ sub_parser = subparsers.add_parser("localconfvalue", help=command_help)
+ sub_parser.add_argument("conf_name")
+ sub_parser.set_defaults(func=print_localconfvalue)
- local also adds in the option --local-ttl which sets the number of
seconds the
- local cluster will run for before it shuts down.
- --local-zookeeper if using an external zookeeper sets the connection
string to use for it.
+def initialize_remoteconfvalue_subcommand(subparsers):
+ command_help = '''Prints out the value for conf-name in the cluster's
Storm configs.
+ The cluster's Storm configs are the ones in $STORM-PATH/conf/storm.yaml
+ merged in with the configs in defaults.yaml.
- --java-debug lets you turn on java debugging and set the parameters
passed to -agentlib:jdwp on the JDK
- --java-debug transport=dt_socket,address=localhost:8000
- will open up a debugging server on port 8000.
- """
- [ttl, lzk, debug_args, args] = parse_local_opts(args)
- extrajvmopts = ["-Dstorm.local.sleeptime=" + ttl]
- if lzk != None:
- extrajvmopts = extrajvmopts + ["-Dstorm.local.zookeeper=" + lzk]
- if debug_args != None:
- extrajvmopts = extrajvmopts + ["-agentlib:jdwp=" + debug_args]
- run_client_jar(jarfile, "org.apache.storm.LocalCluster", [klass] +
list(args), client=False, daemon=False, extrajvmopts=extrajvmopts)
-
-def jar(jarfile, klass, *args):
- """Syntax: [storm jar topology-jar-path class ...]
-
- Runs the main method of class with the specified arguments.
- The storm worker dependencies and configs in ~/.storm are put on the
classpath.
- The process is configured so that StormSubmitter
-
(http://storm.apache.org/releases/current/javadocs/org/apache/storm/StormSubmitter.html)
- will upload the jar at topology-jar-path when the topology is
submitted.
+ This command must be run on a cluster machine.'''
+
+ sub_parser = subparsers.add_parser("remoteconfvalue",
help=command_help)
+ sub_parser.add_argument("conf_name")
+ sub_parser.set_defaults(func=print_remoteconfvalue)
+
+
+def add_topology_jar_options(parser):
+ parser.add_argument("topology_jar_path", help="will upload the jar at
topology-jar-path when the topology is submitted.")
+ parser.add_argument("topology_main_class", help="main class of the
topology jar being submitted")
+ parser.add_argument("topology_main_args", nargs='*', help="Runs the
main method with the specified arguments.")
+
+
+def add_client_jar_options(parser):
- When you want to ship other jars which is not included to application
jar, you can pass them to --jars option with comma-separated string.
+ parser.add_argument("--jars", help='''
+ When you want to ship other jars which are not included to application
jar, you can pass them to --jars option with comma-separated string.
For example, --jars "your-local-jar.jar,your-local-jar2.jar" will load
your-local-jar.jar and your-local-jar2.jar.
- And when you want to ship maven artifacts and its transitive
dependencies, you can pass them to --artifacts with comma-separated string.
+ ''', default="")
+
+ parser.add_argument("--artifacts", help='''
+ When you want to ship maven artifacts and its transitive
dependencies, you can pass them to --artifacts with comma-separated string.
You can also exclude some dependencies like what you're doing in maven
pom.
Please add exclusion artifacts with '^' separated string after the
artifact.
For example, -artifacts
"redis.clients:jedis:2.9.0,org.apache.kafka:kafka-clients:1.0.0^org.slf4j:slf4j-api"
will load jedis and kafka-clients artifact and all of transitive dependencies
but exclude slf4j-api from kafka.
+ ''', default="")
+
- When you need to pull the artifacts from other than Maven Central, you
can pass remote repositories to --artifactRepositories option with
comma-separated string.
+ parser.add_argument("--artifactRepositories", help='''
+ When you need to pull the artifacts from other than Maven Central, you
can pass remote repositories to --artifactRepositories option with a
comma-separated string.
Repository format is "<name>^<url>". '^' is taken as separator because
URL allows various characters.
For example, --artifactRepositories
"jboss-repository^http://repository.jboss.com/maven2,HDPRepo^http://repo.hortonworks.com/content/groups/public/"
will add JBoss and HDP repositories for dependency resolver.
- You can provide local maven repository directory via
--mavenLocalRepositoryDirectory if you would like to use specific directory. It
might help when you don't have '.m2/repository' directory in home directory,
because CWD is sometimes non-deterministic (fragile).
+ ''', default="")
+
+ parser.add_argument("--mavenLocalRepositoryDirectory", help="You can
provide local maven repository directory via --mavenLocalRepositoryDirectory if
you would like to use specific directory. It might help when you don't have
'.m2/repository' directory in home directory, because CWD is sometimes
non-deterministic (fragile).", default="")
+
- You can also provide proxy information to let dependency resolver
utilizing proxy if needed. There're three parameters for proxy:
- --proxyUrl: URL representation of proxy ('http://host:port')
- --proxyUsername: username of proxy if it requires basic auth
- --proxyPassword: password of proxy if it requires basic auth
+ parser.add_argument("--proxyUrl", help="You can also provide proxy
information to let dependency resolver utilizing proxy if needed. URL
representation of proxy ('http://host:port')", default="")
+ parser.add_argument("--proxyUsername", help="username of proxy if it
requires basic auth", default="")
+ parser.add_argument("--proxyPassword", help="password of proxy if it
requires basic auth", default="")
- Complete example of options is here: `./bin/storm jar
example/storm-starter/storm-starter-topologies-*.jar
org.apache.storm.starter.RollingTopWords blobstore-remote2 remote --jars
"./external/storm-redis/storm-redis-1.1.0.jar,./external/storm-kafka-client/storm-kafka-client-1.1.0.jar"
--artifacts
"redis.clients:jedis:2.9.0,org.apache.kafka:kafka-clients:1.0.0^org.slf4j:slf4j-api"
--artifactRepositories
"jboss-repository^http://repository.jboss.com/maven2,HDPRepo^http://repo.hortonworks.com/content/groups/public/"`
+
+def initialize_jar_subcommand(subparsers):
+ jar_help = """Runs the main method of class with the specified
arguments.
+ The storm worker dependencies and configs in ~/.storm are put on the
classpath.
+ The process is configured so that StormSubmitter
+
(http://storm.apache.org/releases/current/javadocs/org/apache/storm/StormSubmitter.html)
+ will upload the jar at topology-jar-path when the topology is
submitted.
When you pass jars and/or artifacts options, StormSubmitter will
upload them when the topology is submitted, and they will be included to
classpath of both the process which runs the class, and also workers for that
topology.
+ """
+ jar_parser = subparsers.add_parser("jar", help=jar_help)
+
+ add_topology_jar_options(jar_parser)
+ add_client_jar_options(jar_parser)
+
+ jar_parser.add_argument(
+ "--storm-server-classpath",
+ action='store_true',
+ help='''
+ If for some reason you need to have the full storm classpath,
+ not just the one for the worker you may include the command line
option `--storm-server-classpath`.
+ Please be careful because this will add things to the classpath
+ that will not be on the worker classpath
+ and could result in the worker not running.'''
+ )
+
+ jar_parser.set_defaults(func=jar)
+
+
+def initialize_local_subcommand(subparsers):
+ command_help = """Runs the main method of class with the specified
arguments but pointing to a local cluster
+ The storm jars and configs in ~/.storm are put on the classpath.
+ The process is configured so that StormSubmitter
+
(http://storm.apache.org/releases/current/javadocs/org/apache/storm/StormSubmitter.html)
+ and others will interact with a local cluster instead of the one
configured by default.
+
+ Most options should work just like with the storm jar command.
+ """
+ sub_parser = subparsers.add_parser("local", help=command_help)
+
+ add_topology_jar_options(sub_parser)
+ add_client_jar_options(sub_parser)
+
+ sub_parser.add_argument(
+ "--local-ttl",
+ help="sets the number of seconds the local cluster will run for
before it shuts down",
+ default=LOCAL_TTL_DEFAULT
+ )
+
+ sub_parser.add_argument(
+ "--java-debug",
+ help="lets you turn on java debugging and set the parameters
passed to -agentlib:jdwp on the JDK" +
+ "e.g transport=dt_socket,address=localhost:8000 will open up
a debugging server on port 8000",
+ default=None
+ )
+
+ sub_parser.set_defaults(func=local)
+
+
+def initialize_kill_subcommand(subparsers):
+ command_help = """Kills the topology with the name topology-name.
Storm will
+ first deactivate the topology's spouts for the duration of
+ the topology's message timeout to allow all messages currently
+ being processed to finish processing. Storm will then shutdown
+ the workers and clean up their state.
+ """
+ sub_parser = subparsers.add_parser("kill", help=command_help)
+
+ sub_parser.add_argument("topology-name")
+
+ sub_parser.add_argument(
+ "-w", "--wait-time-secs",
+ help="""override the length of time Storm waits between
deactivation and shutdown""",
+ default=None
+ )
+
+ sub_parser.set_defaults(func=kill)
+
+
+def check_positive(value):
+ ivalue = int(value)
+ if ivalue <= 0:
+ raise argparse.ArgumentTypeError("%s is not a positive integer" %
value)
+ return ivalue
+
+def check_even_list(cred_list):
+ if not (len(cred_list) % 2):
+ raise argparse.ArgumentTypeError("please provide a list of cred
key and value pairs")
+ return cred_list
+
+
+def initialize_upload_credentials_subcommand(subparsers):
+ command_help = """Uploads a new set of credentials to a running
topology."""
+ sub_parser = subparsers.add_parser("upload-credentials",
help=command_help)
+
+ sub_parser.add_argument("topology-name")
+
+ sub_parser.add_argument(
+ "-f", "--file", default=None,
+ help="""provide a properties file with credentials in it to be
uploaded"""
+ )
+
+ sub_parser.add_argument(
+ "-u", "--user", default=None,
+ help="""name of the owner of the topology (security precaution)"""
+ )
+
+ sub_parser.add_argument(
+ "cred_list", nargs='*', help="List of credkeys and their values
[credkey credvalue]*",
+ type=check_even_list
+ )
+
+ sub_parser.set_defaults(func=upload_credentials)
+
+
+def initialize_sql_subcommand(subparsers):
+ command_help = """Compiles the SQL statements into a Trident topology
and submits it to Storm.
+ If user activates explain mode, SQL Runner analyzes each query
statement
+ and shows query plan instead of submitting topology.
+ """
+
+ sub_parser = subparsers.add_parser("sql", help=command_help)
+
+ add_client_jar_options(sub_parser)
+
+ sub_parser.add_argument("sql_file", metavar="sql-file")
+ sub_parser.add_argument(
+ "topology_name", metavar="topology-name", help="should be
--explain to activate explain mode"
--- End diff --
There are positional arguments added on L520 that don't allow mutual
exclusion
---