This is an automated email from the ASF dual-hosted git repository.

nicknezis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 39cf3a7  Correct heron-executor CLI usage (#3587)
39cf3a7 is described below

commit 39cf3a76b143b5c491de88cfe40cd83f29d2c0be
Author: Oliver Bristow <[email protected]>
AuthorDate: Mon Jul 27 14:55:34 2020 +0100

    Correct heron-executor CLI usage (#3587)
---
 heron/executor/src/python/BUILD                    |   2 +-
 heron/executor/src/python/heron_executor.py        | 199 +++++++++------------
 .../tests/python/heron_executor_unittest.py        |  10 +-
 3 files changed, 87 insertions(+), 124 deletions(-)

diff --git a/heron/executor/src/python/BUILD b/heron/executor/src/python/BUILD
index 432a529..392326c 100644
--- a/heron/executor/src/python/BUILD
+++ b/heron/executor/src/python/BUILD
@@ -3,7 +3,7 @@ package(default_visibility = ["//visibility:public"])
 pex_library(
     name = "executor-py",
     srcs = ["heron_executor.py"],
-    reqs = ["PyYAML==3.13"],
+    reqs = ["PyYAML==3.13", "click==7.1.2"],
     deps = [
         "//heron/common/src/python:common-py",
         "//heron/statemgrs/src/python:statemgr-py",
diff --git a/heron/executor/src/python/heron_executor.py 
b/heron/executor/src/python/heron_executor.py
index 68c094d..491cbdc 100755
--- a/heron/executor/src/python/heron_executor.py
+++ b/heron/executor/src/python/heron_executor.py
@@ -17,9 +17,11 @@
 #  KIND, either express or implied.  See the License for the
 #  specific language governing permissions and limitations
 #  under the License.
+"""
+This CLI manages the execution of a topology binary.
+
+"""
 
-""" The Heron executor is a process that runs on a container and is 
responsible for starting and
-monitoring the processes of the topology and it's support services."""
 import argparse
 import atexit
 import base64
@@ -38,7 +40,6 @@ import time
 import socket
 import traceback
 import itertools
-import yaml
 
 from heron.common.src.python.utils import log
 from heron.common.src.python.utils import proc
@@ -48,42 +49,83 @@ from heron.statemgrs.src.python import statemanagerfactory
 from heron.statemgrs.src.python import configloader
 from heron.statemgrs.src.python.config import Config as StateMgrConfig
 
+import click
+import yaml
+
 Log = log.Log
 
 # pylint: disable=too-many-lines
 
-def print_usage():
-  print(
-      "Usage: ./heron-executor --shard=<shardid> --topology-name=<topname>"
-      " --topology-id=<topid> --topology-defn-file=<topdefnfile>"
-      " --state-manager-connection=<state_manager_connection>"
-      " --state-manager-root=<state_manager_root>"
-      " --state-manager-config-file=<state_manager_config_file>"
-      " --tmaster-binary=<tmaster_binary>"
-      " --stmgr-binary=<stmgr_binary> 
--metrics-manager-classpath=<metricsmgr_classpath>"
-      " --instance-jvm-opts=<instance_jvm_opts_in_base64> 
--classpath=<classpath>"
-      " --master-port=<master_port> 
--tmaster-controller-port=<tmaster_controller_port>"
-      " --tmaster-stats-port=<tmaster_stats_port>"
-      " --heron-internals-config-file=<heron_internals_config_file>"
-      " --override-config-file=<override_config_file> 
--component-ram-map=<component_ram_map>"
-      " --component-jvm-opts=<component_jvm_opts_in_base64> 
--pkg-type=<pkg_type>"
-      " --topology-binary-file=<topology_bin_file> 
--heron-java-home=<heron_java_home>"
-      " --shell-port=<shell-port> --heron-shell-binary=<heron_shell_binary>"
-      " --metrics-manager-port=<metricsmgr_port>"
-      " --cluster=<cluster> --role=<role> --environment=<environ>"
-      " --instance-classpath=<instance_classpath>"
-      " --metrics-sinks-config-file=<metrics_sinks_config_file>"
-      " --scheduler-classpath=<scheduler_classpath> 
--scheduler-port=<scheduler_port>"
-      " --python-instance-binary=<python_instance_binary>"
-      " --metricscache-manager-classpath=<metricscachemgr_classpath>"
-      " --metricscache-manager-master-port=<metricscachemgr_masterport>"
-      " --metricscache-manager-stats-port=<metricscachemgr_statsport>"
-      " --is-stateful=<is_stateful> 
--checkpoint-manager-classpath=<ckptmgr_classpath>"
-      " --checkpoint-manager-port=<ckptmgr_port> 
--checkpoint-manager-ram=<checkpoint_manager_ram>"
-      " --stateful-config-file=<stateful_config_file>"
-      " --health-manager-mode=<healthmgr_mode> 
--health-manager-classpath=<healthmgr_classpath>"
-      " --cpp-instance-binary=<cpp_instance_binary>"
-      " --jvm-remote-debugger-ports=<comma_seperated_port_list>")
[email protected]()
[email protected]("--cluster", required=True)
[email protected]("--role", required=True)
[email protected]("--environment", required=True)
[email protected]("--checkpoint-manager-classpath", required=True)
[email protected]("--checkpoint-manager-port", required=True)
[email protected]("--checkpoint-manager-ram", type=int, required=True)
[email protected]("--classpath", required=True)
[email protected]("--component-jvm-opts", required=True)
[email protected]("--component-ram-map", required=True)
[email protected]("--cpp-instance-binary", required=True)
[email protected]("--health-manager-classpath", required=True)
[email protected]("--health-manager-mode", required=True)
[email protected]("--heron-internals-config-file", required=True)
[email protected]("--heron-java-home", required=True)
[email protected]("--heron-shell-binary", required=True)
[email protected]("--instance-classpath", required=True)
[email protected]("--instance-jvm-opts", required=True)
[email protected]("--is-stateful", required=True)
[email protected]("--master-port", required=True)
[email protected]("--metrics-manager-classpath", required=True)
[email protected]("--metrics-manager-port", required=True)
[email protected]("--metrics-sinks-config-file", required=True)
[email protected]("--metricscache-manager-classpath", required=True)
[email protected]("--metricscache-manager-master-port", required=True)
[email protected]("--metricscache-manager-mode", required=False)
[email protected]("--metricscache-manager-stats-port", required=True)
[email protected]("--override-config-file", required=True)
[email protected]("--pkg-type", required=True)
[email protected]("--python-instance-binary", required=True)
[email protected]("--scheduler-classpath", required=True)
[email protected]("--scheduler-port", required=True)
[email protected]("--shard", type=int, required=True)
[email protected]("--shell-port", required=True)
[email protected]("--state-manager-config-file", required=True)
[email protected]("--state-manager-connection", required=True)
[email protected]("--state-manager-root", required=True)
[email protected]("--stateful-config-file", required=True)
[email protected]("--stmgr-binary", required=True)
[email protected]("--tmaster-binary", required=True)
[email protected]("--tmaster-controller-port", required=True)
[email protected]("--tmaster-stats-port", required=True)
[email protected]("--topology-binary-file", required=True)
[email protected]("--topology-defn-file", required=True)
[email protected]("--topology-id", required=True)
[email protected]("--topology-name", required=True)
[email protected]("--jvm-remote-debugger-ports",
+              help="comma separated list of ports to be used"
+                   " by a remote debugger for JVM instances")
+def cli(
+    **kwargs: dict,
+) -> None:
+  """
+  The Heron executor is a process that runs on a container and is responsible 
for
+  starting and monitoring the processes of the topology and it's support 
services.
+
+  """
+  # Since Heron on YARN runs as headless users, pex compiled
+  # binaries should be exploded into the container working
+  # directory. In order to do this, we need to set the
+  # PEX_ROOT shell environment before forking the processes
+  shell_env = os.environ.copy()
+  shell_env["PEX_ROOT"] = os.path.join(os.path.abspath('.'), ".pex")
+
+  parsed_args = argparse.Namespace(**kwargs)
+  # Instantiate the executor, bind it to signal handlers and launch it
+  executor = HeronExecutor(parsed_args, shell_env)
+  executor.initialize()
+
+  start(executor)
 
 def id_map(prefix, container_plans, add_zero_id=False):
   ids = {}
@@ -304,8 +346,7 @@ class HeronExecutor:
       parsed_args.jvm_remote_debugger_ports.split(",") \
         if parsed_args.jvm_remote_debugger_ports else None
 
-  def __init__(self, args, shell_env):
-    parsed_args = self.parse_args(args)
+  def __init__(self, parsed_args, shell_env):
     self.init_from_parsed_args(parsed_args)
 
     self.shell_env = shell_env
@@ -330,69 +371,6 @@ class HeronExecutor:
     self.state_managers = []
     self.jvm_version = None
 
-  @staticmethod
-  def parse_args(args):
-    """Uses python argparse to collect positional args"""
-    Log.info("Input args: %r" % args)
-
-    parser = argparse.ArgumentParser()
-
-    parser.add_argument("--shard", type=int, required=True)
-    parser.add_argument("--topology-name", required=True)
-    parser.add_argument("--topology-id", required=True)
-    parser.add_argument("--topology-defn-file", required=True)
-    parser.add_argument("--state-manager-connection", required=True)
-    parser.add_argument("--state-manager-root", required=True)
-    parser.add_argument("--state-manager-config-file", required=True)
-    parser.add_argument("--tmaster-binary", required=True)
-    parser.add_argument("--stmgr-binary", required=True)
-    parser.add_argument("--metrics-manager-classpath", required=True)
-    parser.add_argument("--instance-jvm-opts", required=True)
-    parser.add_argument("--classpath", required=True)
-    parser.add_argument("--master-port", required=True)
-    parser.add_argument("--tmaster-controller-port", required=True)
-    parser.add_argument("--tmaster-stats-port", required=True)
-    parser.add_argument("--heron-internals-config-file", required=True)
-    parser.add_argument("--override-config-file", required=True)
-    parser.add_argument("--component-ram-map", required=True)
-    parser.add_argument("--component-jvm-opts", required=True)
-    parser.add_argument("--pkg-type", required=True)
-    parser.add_argument("--topology-binary-file", required=True)
-    parser.add_argument("--heron-java-home", required=True)
-    parser.add_argument("--shell-port", required=True)
-    parser.add_argument("--heron-shell-binary", required=True)
-    parser.add_argument("--metrics-manager-port", required=True)
-    parser.add_argument("--cluster", required=True)
-    parser.add_argument("--role", required=True)
-    parser.add_argument("--environment", required=True)
-    parser.add_argument("--instance-classpath", required=True)
-    parser.add_argument("--metrics-sinks-config-file", required=True)
-    parser.add_argument("--scheduler-classpath", required=True)
-    parser.add_argument("--scheduler-port", required=True)
-    parser.add_argument("--python-instance-binary", required=True)
-    parser.add_argument("--cpp-instance-binary", required=True)
-    parser.add_argument("--metricscache-manager-classpath", required=True)
-    parser.add_argument("--metricscache-manager-master-port", required=True)
-    parser.add_argument("--metricscache-manager-stats-port", required=True)
-    parser.add_argument("--metricscache-manager-mode", required=False)
-    parser.add_argument("--is-stateful", required=True)
-    parser.add_argument("--checkpoint-manager-classpath", required=True)
-    parser.add_argument("--checkpoint-manager-port", required=True)
-    parser.add_argument("--checkpoint-manager-ram", type=int, required=True)
-    parser.add_argument("--stateful-config-file", required=True)
-    parser.add_argument("--health-manager-mode", required=True)
-    parser.add_argument("--health-manager-classpath", required=True)
-    parser.add_argument("--jvm-remote-debugger-ports", required=False,
-                        help="ports to be used by a remote debugger for JVM 
instances")
-
-    parsed_args, unknown_args = parser.parse_known_args(args[1:])
-
-    if unknown_args:
-      Log.warn('Unknown arguments found!!! They are: %s' % unknown_args)
-      Log.warn(parser.format_help())
-
-    return parsed_args
-
   def run_command_or_exit(self, command):
     if self._run_blocking_process(command, True) != 0:
       Log.error("Failed to run command: %s. Exiting" % command)
@@ -982,7 +960,7 @@ class HeronExecutor:
     # Now wait for any child to die
     Log.info("Start process monitor")
     while True:
-      if len(self.processes_to_monitor) > 0:
+      if self.processes_to_monitor:
         (pid, status) = os.wait()
 
         with self.process_lock:
@@ -1100,7 +1078,7 @@ class HeronExecutor:
     with open(self.override_config_file, 'r') as stream:
       overrides = yaml.load(stream)
       if overrides is None:
-        overrides = dict()
+        overrides = {}
     overrides["heron.statemgr.connection.string"] = 
self.state_manager_connection
 
     statemgr_config = StateMgrConfig()
@@ -1201,20 +1179,5 @@ def start(executor):
   # they are dead. This is the main loop of executor
   executor.start_process_monitor()
 
-def main():
-  """Register exit handlers, initialize the executor and run it."""
-  # Since Heron on YARN runs as headless users, pex compiled
-  # binaries should be exploded into the container working
-  # directory. In order to do this, we need to set the
-  # PEX_ROOT shell environment before forking the processes
-  shell_env = os.environ.copy()
-  shell_env["PEX_ROOT"] = os.path.join(os.path.abspath('.'), ".pex")
-
-  # Instantiate the executor, bind it to signal handlers and launch it
-  executor = HeronExecutor(sys.argv, shell_env)
-  executor.initialize()
-
-  start(executor)
-
 if __name__ == "__main__":
-  main()
+  cli()
diff --git a/heron/executor/tests/python/heron_executor_unittest.py 
b/heron/executor/tests/python/heron_executor_unittest.py
index f097999..84481a1 100644
--- a/heron/executor/tests/python/heron_executor_unittest.py
+++ b/heron/executor/tests/python/heron_executor_unittest.py
@@ -19,6 +19,7 @@
 #  under the License.
 
 '''heron executor unittest'''
+import argparse
 import os
 import socket
 import unittest
@@ -26,8 +27,7 @@ import json
 
 from pprint import pprint
 
-from heron.executor.src.python.heron_executor import ProcessInfo
-from heron.executor.src.python.heron_executor import HeronExecutor
+from heron.executor.src.python.heron_executor import cli, HeronExecutor, 
ProcessInfo
 from heron.proto.packing_plan_pb2 import PackingPlan
 
 # pylint: disable=unused-argument
@@ -299,9 +299,9 @@ class HeronExecutorTest(unittest.TestCase):
       ("--metricscache-manager-mode", "cluster")
     ]
 
-    args = ("%s=%s" % (arg[0], (str(arg[1]))) for arg in executor_args)
-    command = "./heron-executor %s" % (" ".join(args))
-    return command.split()
+    args = [f"{k}={v}" for k, v in executor_args]
+    ctx = cli.make_context('heron-executor', args)
+    return argparse.Namespace(**ctx.params)
 
   def test_update_packing_plan(self):
     self.executor_0.update_packing_plan(self.packing_plan_expected)

Reply via email to