Github user govind-menon commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2930#discussion_r247197760
  
    --- Diff: bin/storm.py ---
    @@ -296,787 +239,1044 @@ def exec_storm_class(klass, jvmtype="-server", 
jvmopts=[], extrajars=[], args=[]
         elif is_windows():
             # handling whitespaces in JAVA_CMD
             try:
    -            ret = sub.check_output(all_args, stderr=sub.STDOUT)
    +            ret = subprocess.check_output(all_args, 
stderr=subprocess.STDOUT)
                 print(ret)
    -        except sub.CalledProcessError as e:
    +        except subprocess.CalledProcessError as e:
                 print(e.output)
                 sys.exit(e.returncode)
         else:
             os.execvp(JAVA_CMD, all_args)
         return exit_code
     
    -def run_client_jar(jarfile, klass, args, daemon=False, client=True, 
extrajvmopts=[]):
    -    global DEP_JARS_OPTS, DEP_ARTIFACTS_OPTS, 
DEP_ARTIFACTS_REPOSITORIES_OPTS, DEP_MAVEN_LOCAL_REPOSITORY_DIRECTORY, 
DEP_PROXY_URL, DEP_PROXY_USERNAME, DEP_PROXY_PASSWORD
     
    -    local_jars = DEP_JARS_OPTS
    -    artifact_to_file_jars = resolve_dependencies(DEP_ARTIFACTS_OPTS, 
DEP_ARTIFACTS_REPOSITORIES_OPTS, DEP_MAVEN_LOCAL_REPOSITORY_DIRECTORY, 
DEP_PROXY_URL, DEP_PROXY_USERNAME, DEP_PROXY_PASSWORD)
    +def run_client_jar(klass, args, daemon=False, client=True, 
extrajvmopts=[]):
    +    local_jars = args.jars.split(",")
    +    jarfile = args.topology_jar_path
    +
    +    artifact_to_file_jars = resolve_dependencies(
    +        args.artifacts, args.artifactRepositories,
    +        args.mavenLocalRepositoryDirectory, args.proxyUrl,
    +        args.proxyUsername, args.proxyPassword
    +    )
     
    -    extra_jars=[jarfile, USER_CONF_DIR, STORM_BIN_DIR]
    +    extra_jars = [jarfile, USER_CONF_DIR, STORM_BIN_DIR]
         extra_jars.extend(local_jars)
         extra_jars.extend(artifact_to_file_jars.values())
         exec_storm_class(
    -        klass,
    +        klass, args.c,
             jvmtype="-client",
             extrajars=extra_jars,
    -        args=args,
    +        args=args.topology_main_args,
             daemon=False,
             jvmopts=JAR_JVM_OPTS + extrajvmopts + ["-Dstorm.jar=" + jarfile] +
                     ["-Dstorm.dependency.jars=" + ",".join(local_jars)] +
                     ["-Dstorm.dependency.artifacts=" + 
json.dumps(artifact_to_file_jars)])
     
    -def local(jarfile, klass, *args):
    -    """Syntax: [storm local topology-jar-path class ...]
     
    -    Runs the main method of class with the specified arguments but 
pointing to a local cluster
    -    The storm jars and configs in ~/.storm are put on the classpath.
    -    The process is configured so that StormSubmitter
    -    
(http://storm.apache.org/releases/current/javadocs/org/apache/storm/StormSubmitter.html)
    -    and others will interact with a local cluster instead of the one 
configured by default.
    +def print_localconfvalue(args):
    +    print(args.conf_name + ": " + confvalue(args.conf_name, args.c, 
[USER_CONF_DIR]))
    +
    +
    +def print_remoteconfvalue(args):
    +    print(args.conf_name + ": " + confvalue(args.conf_name, args.c, 
[CLUSTER_CONF_DIR]))
    +
    +
    +def initialize_main_command():
    +    main_parser = argparse.ArgumentParser(prog="storm")
    +    main_parser.add_argument("--config", default=None, help="Override 
default storm conf")
    +    main_parser.add_argument("-c", action="append", default=[], 
help="Override storm conf properties")
    +
    +    subparsers = main_parser.add_subparsers(help="")
    +
    +    initialize_jar_subcommand(subparsers)
    +    initialize_localconfvalue_subcommand(subparsers)
    +    initialize_remoteconfvalue_subcommand(subparsers)
    +    initialize_local_subcommand(subparsers)
    +    initialize_sql_subcommand(subparsers)
    +    initialize_kill_subcommand(subparsers)
    +    initialize_upload_credentials_subcommand(subparsers)
    +    initialize_blobstore_subcommand(subparsers)
    +    initialize_heartbeats_subcommand(subparsers)
    +    initialize_activate_subcommand(subparsers)
    +    initialize_listtopos_subcommand(subparsers)
    +    initialize_deactivate_subcommand(subparsers)
    +    initialize_rebalance_subcommand(subparsers)
    +    initialize_get_errors_subcommand(subparsers)
    +    initialize_healthcheck_subcommand(subparsers)
    +    initialize_kill_workers_subcommand(subparsers)
    +    initialize_admin_subcommand(subparsers)
    +    initialize_shell_subcommand(subparsers)
    +    initialize_repl_subcommand(subparsers)
    +    initialize_nimbus_subcommand(subparsers)
    +    initialize_pacemaker_subcommand(subparsers)
    +    initialize_supervisor_subcommand(subparsers)
    +    initialize_ui_subcommand(subparsers)
    +    initialize_logviewer_subcommand(subparsers)
    +    initialize_drpc_client_subcommand(subparsers)
    +    initialize_drpc_subcommand(subparsers)
    +    initialize_dev_zookeeper_subcommand(subparsers)
    +    initialize_version_subcommand(subparsers)
    +    initialize_classpath_subcommand(subparsers)
    +    initialize_server_classpath_subcommand(subparsers)
    +    initialize_monitor_subcommand(subparsers)
    +
    +    return main_parser
    +
    +
    +def initialize_localconfvalue_subcommand(subparsers):
    +    command_help = '''Prints out the value for conf-name in the local 
Storm configs.
    +    The local Storm configs are the ones in ~/.storm/storm.yaml merged
    +    in with the configs in defaults.yaml.'''
     
    -    Most options should work just like with the storm jar command.
    +    sub_parser = subparsers.add_parser("localconfvalue", help=command_help)
    +    sub_parser.add_argument("conf_name")
    +    sub_parser.set_defaults(func=print_localconfvalue)
     
    -    local also adds in the option --local-ttl which sets the number of 
seconds the
    -    local cluster will run for before it shuts down.
     
    -    --local-zookeeper if using an external zookeeper sets the connection 
string to use for it.
    +def initialize_remoteconfvalue_subcommand(subparsers):
    +    command_help = '''Prints out the value for conf-name in the cluster's 
Storm configs.
    +    The cluster's Storm configs are the ones in $STORM-PATH/conf/storm.yaml
    +    merged in with the configs in defaults.yaml.
     
    -    --java-debug lets you turn on java debugging and set the parameters 
passed to -agentlib:jdwp on the JDK
    -    --java-debug transport=dt_socket,address=localhost:8000
    -    will open up a debugging server on port 8000.
    -    """
    -    [ttl, lzk, debug_args, args] = parse_local_opts(args)
    -    extrajvmopts = ["-Dstorm.local.sleeptime=" + ttl]
    -    if lzk != None:
    -        extrajvmopts = extrajvmopts + ["-Dstorm.local.zookeeper=" + lzk]
    -    if debug_args != None:
    -        extrajvmopts = extrajvmopts + ["-agentlib:jdwp=" + debug_args]
    -    run_client_jar(jarfile, "org.apache.storm.LocalCluster", [klass] + 
list(args), client=False, daemon=False, extrajvmopts=extrajvmopts)
    -
    -def jar(jarfile, klass, *args):
    -    """Syntax: [storm jar topology-jar-path class ...]
    -
    -    Runs the main method of class with the specified arguments.
    -    The storm worker dependencies and configs in ~/.storm are put on the 
classpath.
    -    The process is configured so that StormSubmitter
    -    
(http://storm.apache.org/releases/current/javadocs/org/apache/storm/StormSubmitter.html)
    -    will upload the jar at topology-jar-path when the topology is 
submitted.
    +    This command must be run on a cluster machine.'''
    +
    +    sub_parser = subparsers.add_parser("remoteconfvalue", 
help=command_help)
    +    sub_parser.add_argument("conf_name")
    +    sub_parser.set_defaults(func=print_remoteconfvalue)
    +
    +
    +def add_topology_jar_options(parser):
    +    parser.add_argument("topology_jar_path", help="will upload the jar at 
topology-jar-path when the topology is submitted.")
    +    parser.add_argument("topology_main_class", help="main class of the 
topology jar being submitted")
    +    parser.add_argument("topology_main_args", nargs='*', help="Runs the 
main method with the specified arguments.")
    +
    +
    +def add_client_jar_options(parser):
     
    -    When you want to ship other jars which is not included to application 
jar, you can pass them to --jars option with comma-separated string.
    +    parser.add_argument("--jars", help='''
    +    When you want to ship other jars which are not included to application 
jar, you can pass them to --jars option with comma-separated string.
         For example, --jars "your-local-jar.jar,your-local-jar2.jar" will load 
your-local-jar.jar and your-local-jar2.jar.
    -    And when you want to ship maven artifacts and its transitive 
dependencies, you can pass them to --artifacts with comma-separated string.
    +    ''', default="")
    +
    +    parser.add_argument("--artifacts", help='''
    +     When you want to ship maven artifacts and its transitive 
dependencies, you can pass them to --artifacts with comma-separated string.
         You can also exclude some dependencies like what you're doing in maven 
pom.
         Please add exclusion artifacts with '^' separated string after the 
artifact.
         For example, -artifacts 
"redis.clients:jedis:2.9.0,org.apache.kafka:kafka-clients:1.0.0^org.slf4j:slf4j-api"
 will load jedis and kafka-clients artifact and all of transitive dependencies 
but exclude slf4j-api from kafka.
    +        ''', default="")
    +
     
    -    When you need to pull the artifacts from other than Maven Central, you 
can pass remote repositories to --artifactRepositories option with 
comma-separated string.
    +    parser.add_argument("--artifactRepositories", help='''
    +    When you need to pull the artifacts from other than Maven Central, you 
can pass remote repositories to --artifactRepositories option with a 
comma-separated string.
         Repository format is "<name>^<url>". '^' is taken as separator because 
URL allows various characters.
         For example, --artifactRepositories 
"jboss-repository^http://repository.jboss.com/maven2,HDPRepo^http://repo.hortonworks.com/content/groups/public/";
 will add JBoss and HDP repositories for dependency resolver.
    -    You can provide local maven repository directory via 
--mavenLocalRepositoryDirectory if you would like to use specific directory. It 
might help when you don't have '.m2/repository' directory in home directory, 
because CWD is sometimes non-deterministic (fragile).
    +    ''', default="")
    +
    +    parser.add_argument("--mavenLocalRepositoryDirectory", help="You can 
provide local maven repository directory via --mavenLocalRepositoryDirectory if 
you would like to use specific directory. It might help when you don't have 
'.m2/repository' directory in home directory, because CWD is sometimes 
non-deterministic (fragile).", default="")
    +
     
    -    You can also provide proxy information to let dependency resolver 
utilizing proxy if needed. There're three parameters for proxy:
    -    --proxyUrl: URL representation of proxy ('http://host:port')
    -    --proxyUsername: username of proxy if it requires basic auth
    -    --proxyPassword: password of proxy if it requires basic auth
    +    parser.add_argument("--proxyUrl", help="You can also provide proxy 
information to let dependency resolver utilizing proxy if needed. URL 
representation of proxy ('http://host:port')", default="")
    +    parser.add_argument("--proxyUsername", help="username of proxy if it 
requires basic auth", default="")
    +    parser.add_argument("--proxyPassword", help="password of proxy if it 
requires basic auth", default="")
     
    -    Complete example of options is here: `./bin/storm jar 
example/storm-starter/storm-starter-topologies-*.jar 
org.apache.storm.starter.RollingTopWords blobstore-remote2 remote --jars 
"./external/storm-redis/storm-redis-1.1.0.jar,./external/storm-kafka-client/storm-kafka-client-1.1.0.jar"
 --artifacts 
"redis.clients:jedis:2.9.0,org.apache.kafka:kafka-clients:1.0.0^org.slf4j:slf4j-api"
 --artifactRepositories 
"jboss-repository^http://repository.jboss.com/maven2,HDPRepo^http://repo.hortonworks.com/content/groups/public/"`
    +
    +def initialize_jar_subcommand(subparsers):
    +    jar_help = """Runs the main method of class with the specified 
arguments.
    +    The storm worker dependencies and configs in ~/.storm are put on the 
classpath.
    +    The process is configured so that StormSubmitter
    +    
(http://storm.apache.org/releases/current/javadocs/org/apache/storm/StormSubmitter.html)
    +    will upload the jar at topology-jar-path when the topology is 
submitted.
     
         When you pass jars and/or artifacts options, StormSubmitter will 
upload them when the topology is submitted, and they will be included to 
classpath of both the process which runs the class, and also workers for that 
topology.
    +    """
    +    jar_parser = subparsers.add_parser("jar", help=jar_help)
    +
    +    add_topology_jar_options(jar_parser)
    +    add_client_jar_options(jar_parser)
    +
    +    jar_parser.add_argument(
    +        "--storm-server-classpath",
    +        action='store_true',
    +        help='''
    +        If for some reason you need to have the full storm classpath,
    +        not just the one for the worker you may include the command line 
option `--storm-server-classpath`.
    +        Please be careful because this will add things to the classpath
    +        that will not be on the worker classpath
    +        and could result in the worker not running.'''
    +    )
    +
    +    jar_parser.set_defaults(func=jar)
    +
    +
    +def initialize_local_subcommand(subparsers):
    +    command_help = """Runs the main method of class with the specified 
arguments but pointing to a local cluster
    +    The storm jars and configs in ~/.storm are put on the classpath.
    +    The process is configured so that StormSubmitter
    +    
(http://storm.apache.org/releases/current/javadocs/org/apache/storm/StormSubmitter.html)
    +    and others will interact with a local cluster instead of the one 
configured by default.
    +
    +    Most options should work just like with the storm jar command.
    +    """
    +    sub_parser = subparsers.add_parser("local", help=command_help)
    +
    +    add_topology_jar_options(sub_parser)
    +    add_client_jar_options(sub_parser)
    +
    +    sub_parser.add_argument(
    +        "--local-ttl",
    +        help="sets the number of seconds the local cluster will run for 
before it shuts down",
    +        default=LOCAL_TTL_DEFAULT
    +    )
    +
    +    sub_parser.add_argument(
    +        "--java-debug",
    +        help="lets you turn on java debugging and set the parameters 
passed to -agentlib:jdwp on the JDK" +
    +             "e.g transport=dt_socket,address=localhost:8000 will open up 
a debugging server on port 8000",
    +        default=None
    +    )
    +
    +    sub_parser.set_defaults(func=local)
    +
    +
    +def initialize_kill_subcommand(subparsers):
    +    command_help = """Kills the topology with the name topology-name. 
Storm will
    +    first deactivate the topology's spouts for the duration of
    +    the topology's message timeout to allow all messages currently
    +    being processed to finish processing. Storm will then shutdown
    +    the workers and clean up their state.
    +    """
    +    sub_parser = subparsers.add_parser("kill", help=command_help)
    +
    +    sub_parser.add_argument("topology-name")
    +
    +    sub_parser.add_argument(
    +        "-w", "--wait-time-secs",
    +        help="""override the length of time Storm waits between 
deactivation and shutdown""",
    +        default=None
    +    )
    +
    +    sub_parser.set_defaults(func=kill)
    +
    +
    +def check_positive(value):
    +    ivalue = int(value)
    +    if ivalue <= 0:
    +        raise argparse.ArgumentTypeError("%s is not a positive integer" % 
value)
    +    return ivalue
    +
    +def check_even_list(cred_list):
    +    if not (len(cred_list) % 2):
    +        raise argparse.ArgumentTypeError("please provide a list of cred 
key and value pairs")
    +    return cred_list
    +
    +
    +def initialize_upload_credentials_subcommand(subparsers):
    +    command_help = """Uploads a new set of credentials to a running 
topology."""
    +    sub_parser = subparsers.add_parser("upload-credentials", 
help=command_help)
    +
    +    sub_parser.add_argument("topology-name")
    +
    +    sub_parser.add_argument(
    +        "-f", "--file", default=None,
    +        help="""provide a properties file with credentials in it to be 
uploaded"""
    +    )
    +
    +    sub_parser.add_argument(
    +        "-u", "--user", default=None,
    +        help="""name of the owner of the topology (security precaution)"""
    +    )
    +
    +    sub_parser.add_argument(
    +        "cred_list", nargs='*', help="List of credkeys and their values 
[credkey credvalue]*",
    +        type=check_even_list
    +    )
    +
    +    sub_parser.set_defaults(func=upload_credentials)
    +
    +
    +def initialize_sql_subcommand(subparsers):
    +    command_help = """Compiles the SQL statements into a Trident topology 
and submits it to Storm.
    +    If user activates explain mode, SQL Runner analyzes each query 
statement
    +    and shows query plan instead of submitting topology.
    +    """
    +
    +    sub_parser = subparsers.add_parser("sql", help=command_help)
    +
    +    add_client_jar_options(sub_parser)
    +
    +    sub_parser.add_argument("sql_file", metavar="sql-file")
    +    sub_parser.add_argument(
    +        "topology_name", metavar="topology-name", help="should be 
--explain to activate explain mode"
    --- End diff --
    
    There are positional arguments added on L520 that don't allow mutual 
exclusion


---

Reply via email to