This is an automated email from the ASF dual-hosted git repository.
nicknezis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 39cf3a7 Correct heron-executor CLI usage (#3587)
39cf3a7 is described below
commit 39cf3a76b143b5c491de88cfe40cd83f29d2c0be
Author: Oliver Bristow <[email protected]>
AuthorDate: Mon Jul 27 14:55:34 2020 +0100
Correct heron-executor CLI usage (#3587)
---
heron/executor/src/python/BUILD | 2 +-
heron/executor/src/python/heron_executor.py | 199 +++++++++------------
.../tests/python/heron_executor_unittest.py | 10 +-
3 files changed, 87 insertions(+), 124 deletions(-)
diff --git a/heron/executor/src/python/BUILD b/heron/executor/src/python/BUILD
index 432a529..392326c 100644
--- a/heron/executor/src/python/BUILD
+++ b/heron/executor/src/python/BUILD
@@ -3,7 +3,7 @@ package(default_visibility = ["//visibility:public"])
pex_library(
name = "executor-py",
srcs = ["heron_executor.py"],
- reqs = ["PyYAML==3.13"],
+ reqs = ["PyYAML==3.13", "click==7.1.2"],
deps = [
"//heron/common/src/python:common-py",
"//heron/statemgrs/src/python:statemgr-py",
diff --git a/heron/executor/src/python/heron_executor.py
b/heron/executor/src/python/heron_executor.py
index 68c094d..491cbdc 100755
--- a/heron/executor/src/python/heron_executor.py
+++ b/heron/executor/src/python/heron_executor.py
@@ -17,9 +17,11 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+"""
+This CLI manages the execution of a topology binary.
+
+"""
-""" The Heron executor is a process that runs on a container and is
responsible for starting and
-monitoring the processes of the topology and it's support services."""
import argparse
import atexit
import base64
@@ -38,7 +40,6 @@ import time
import socket
import traceback
import itertools
-import yaml
from heron.common.src.python.utils import log
from heron.common.src.python.utils import proc
@@ -48,42 +49,83 @@ from heron.statemgrs.src.python import statemanagerfactory
from heron.statemgrs.src.python import configloader
from heron.statemgrs.src.python.config import Config as StateMgrConfig
+import click
+import yaml
+
Log = log.Log
# pylint: disable=too-many-lines
-def print_usage():
- print(
- "Usage: ./heron-executor --shard=<shardid> --topology-name=<topname>"
- " --topology-id=<topid> --topology-defn-file=<topdefnfile>"
- " --state-manager-connection=<state_manager_connection>"
- " --state-manager-root=<state_manager_root>"
- " --state-manager-config-file=<state_manager_config_file>"
- " --tmaster-binary=<tmaster_binary>"
- " --stmgr-binary=<stmgr_binary>
--metrics-manager-classpath=<metricsmgr_classpath>"
- " --instance-jvm-opts=<instance_jvm_opts_in_base64>
--classpath=<classpath>"
- " --master-port=<master_port>
--tmaster-controller-port=<tmaster_controller_port>"
- " --tmaster-stats-port=<tmaster_stats_port>"
- " --heron-internals-config-file=<heron_internals_config_file>"
- " --override-config-file=<override_config_file>
--component-ram-map=<component_ram_map>"
- " --component-jvm-opts=<component_jvm_opts_in_base64>
--pkg-type=<pkg_type>"
- " --topology-binary-file=<topology_bin_file>
--heron-java-home=<heron_java_home>"
- " --shell-port=<shell-port> --heron-shell-binary=<heron_shell_binary>"
- " --metrics-manager-port=<metricsmgr_port>"
- " --cluster=<cluster> --role=<role> --environment=<environ>"
- " --instance-classpath=<instance_classpath>"
- " --metrics-sinks-config-file=<metrics_sinks_config_file>"
- " --scheduler-classpath=<scheduler_classpath>
--scheduler-port=<scheduler_port>"
- " --python-instance-binary=<python_instance_binary>"
- " --metricscache-manager-classpath=<metricscachemgr_classpath>"
- " --metricscache-manager-master-port=<metricscachemgr_masterport>"
- " --metricscache-manager-stats-port=<metricscachemgr_statsport>"
- " --is-stateful=<is_stateful>
--checkpoint-manager-classpath=<ckptmgr_classpath>"
- " --checkpoint-manager-port=<ckptmgr_port>
--checkpoint-manager-ram=<checkpoint_manager_ram>"
- " --stateful-config-file=<stateful_config_file>"
- " --health-manager-mode=<healthmgr_mode>
--health-manager-classpath=<healthmgr_classpath>"
- " --cpp-instance-binary=<cpp_instance_binary>"
- " --jvm-remote-debugger-ports=<comma_seperated_port_list>")
[email protected]()
[email protected]("--cluster", required=True)
[email protected]("--role", required=True)
[email protected]("--environment", required=True)
[email protected]("--checkpoint-manager-classpath", required=True)
[email protected]("--checkpoint-manager-port", required=True)
[email protected]("--checkpoint-manager-ram", type=int, required=True)
[email protected]("--classpath", required=True)
[email protected]("--component-jvm-opts", required=True)
[email protected]("--component-ram-map", required=True)
[email protected]("--cpp-instance-binary", required=True)
[email protected]("--health-manager-classpath", required=True)
[email protected]("--health-manager-mode", required=True)
[email protected]("--heron-internals-config-file", required=True)
[email protected]("--heron-java-home", required=True)
[email protected]("--heron-shell-binary", required=True)
[email protected]("--instance-classpath", required=True)
[email protected]("--instance-jvm-opts", required=True)
[email protected]("--is-stateful", required=True)
[email protected]("--master-port", required=True)
[email protected]("--metrics-manager-classpath", required=True)
[email protected]("--metrics-manager-port", required=True)
[email protected]("--metrics-sinks-config-file", required=True)
[email protected]("--metricscache-manager-classpath", required=True)
[email protected]("--metricscache-manager-master-port", required=True)
[email protected]("--metricscache-manager-mode", required=False)
[email protected]("--metricscache-manager-stats-port", required=True)
[email protected]("--override-config-file", required=True)
[email protected]("--pkg-type", required=True)
[email protected]("--python-instance-binary", required=True)
[email protected]("--scheduler-classpath", required=True)
[email protected]("--scheduler-port", required=True)
[email protected]("--shard", type=int, required=True)
[email protected]("--shell-port", required=True)
[email protected]("--state-manager-config-file", required=True)
[email protected]("--state-manager-connection", required=True)
[email protected]("--state-manager-root", required=True)
[email protected]("--stateful-config-file", required=True)
[email protected]("--stmgr-binary", required=True)
[email protected]("--tmaster-binary", required=True)
[email protected]("--tmaster-controller-port", required=True)
[email protected]("--tmaster-stats-port", required=True)
[email protected]("--topology-binary-file", required=True)
[email protected]("--topology-defn-file", required=True)
[email protected]("--topology-id", required=True)
[email protected]("--topology-name", required=True)
[email protected]("--jvm-remote-debugger-ports",
+ help="comma separated list of ports to be used"
+ " by a remote debugger for JVM instances")
+def cli(
+ **kwargs: dict,
+) -> None:
+ """
+ The Heron executor is a process that runs on a container and is responsible
for
+ starting and monitoring the processes of the topology and it's support
services.
+
+ """
+ # Since Heron on YARN runs as headless users, pex compiled
+ # binaries should be exploded into the container working
+ # directory. In order to do this, we need to set the
+ # PEX_ROOT shell environment before forking the processes
+ shell_env = os.environ.copy()
+ shell_env["PEX_ROOT"] = os.path.join(os.path.abspath('.'), ".pex")
+
+ parsed_args = argparse.Namespace(**kwargs)
+ # Instantiate the executor, bind it to signal handlers and launch it
+ executor = HeronExecutor(parsed_args, shell_env)
+ executor.initialize()
+
+ start(executor)
def id_map(prefix, container_plans, add_zero_id=False):
ids = {}
@@ -304,8 +346,7 @@ class HeronExecutor:
parsed_args.jvm_remote_debugger_ports.split(",") \
if parsed_args.jvm_remote_debugger_ports else None
- def __init__(self, args, shell_env):
- parsed_args = self.parse_args(args)
+ def __init__(self, parsed_args, shell_env):
self.init_from_parsed_args(parsed_args)
self.shell_env = shell_env
@@ -330,69 +371,6 @@ class HeronExecutor:
self.state_managers = []
self.jvm_version = None
- @staticmethod
- def parse_args(args):
- """Uses python argparse to collect positional args"""
- Log.info("Input args: %r" % args)
-
- parser = argparse.ArgumentParser()
-
- parser.add_argument("--shard", type=int, required=True)
- parser.add_argument("--topology-name", required=True)
- parser.add_argument("--topology-id", required=True)
- parser.add_argument("--topology-defn-file", required=True)
- parser.add_argument("--state-manager-connection", required=True)
- parser.add_argument("--state-manager-root", required=True)
- parser.add_argument("--state-manager-config-file", required=True)
- parser.add_argument("--tmaster-binary", required=True)
- parser.add_argument("--stmgr-binary", required=True)
- parser.add_argument("--metrics-manager-classpath", required=True)
- parser.add_argument("--instance-jvm-opts", required=True)
- parser.add_argument("--classpath", required=True)
- parser.add_argument("--master-port", required=True)
- parser.add_argument("--tmaster-controller-port", required=True)
- parser.add_argument("--tmaster-stats-port", required=True)
- parser.add_argument("--heron-internals-config-file", required=True)
- parser.add_argument("--override-config-file", required=True)
- parser.add_argument("--component-ram-map", required=True)
- parser.add_argument("--component-jvm-opts", required=True)
- parser.add_argument("--pkg-type", required=True)
- parser.add_argument("--topology-binary-file", required=True)
- parser.add_argument("--heron-java-home", required=True)
- parser.add_argument("--shell-port", required=True)
- parser.add_argument("--heron-shell-binary", required=True)
- parser.add_argument("--metrics-manager-port", required=True)
- parser.add_argument("--cluster", required=True)
- parser.add_argument("--role", required=True)
- parser.add_argument("--environment", required=True)
- parser.add_argument("--instance-classpath", required=True)
- parser.add_argument("--metrics-sinks-config-file", required=True)
- parser.add_argument("--scheduler-classpath", required=True)
- parser.add_argument("--scheduler-port", required=True)
- parser.add_argument("--python-instance-binary", required=True)
- parser.add_argument("--cpp-instance-binary", required=True)
- parser.add_argument("--metricscache-manager-classpath", required=True)
- parser.add_argument("--metricscache-manager-master-port", required=True)
- parser.add_argument("--metricscache-manager-stats-port", required=True)
- parser.add_argument("--metricscache-manager-mode", required=False)
- parser.add_argument("--is-stateful", required=True)
- parser.add_argument("--checkpoint-manager-classpath", required=True)
- parser.add_argument("--checkpoint-manager-port", required=True)
- parser.add_argument("--checkpoint-manager-ram", type=int, required=True)
- parser.add_argument("--stateful-config-file", required=True)
- parser.add_argument("--health-manager-mode", required=True)
- parser.add_argument("--health-manager-classpath", required=True)
- parser.add_argument("--jvm-remote-debugger-ports", required=False,
- help="ports to be used by a remote debugger for JVM
instances")
-
- parsed_args, unknown_args = parser.parse_known_args(args[1:])
-
- if unknown_args:
- Log.warn('Unknown arguments found!!! They are: %s' % unknown_args)
- Log.warn(parser.format_help())
-
- return parsed_args
-
def run_command_or_exit(self, command):
if self._run_blocking_process(command, True) != 0:
Log.error("Failed to run command: %s. Exiting" % command)
@@ -982,7 +960,7 @@ class HeronExecutor:
# Now wait for any child to die
Log.info("Start process monitor")
while True:
- if len(self.processes_to_monitor) > 0:
+ if self.processes_to_monitor:
(pid, status) = os.wait()
with self.process_lock:
@@ -1100,7 +1078,7 @@ class HeronExecutor:
with open(self.override_config_file, 'r') as stream:
overrides = yaml.load(stream)
if overrides is None:
- overrides = dict()
+ overrides = {}
overrides["heron.statemgr.connection.string"] =
self.state_manager_connection
statemgr_config = StateMgrConfig()
@@ -1201,20 +1179,5 @@ def start(executor):
# they are dead. This is the main loop of executor
executor.start_process_monitor()
-def main():
- """Register exit handlers, initialize the executor and run it."""
- # Since Heron on YARN runs as headless users, pex compiled
- # binaries should be exploded into the container working
- # directory. In order to do this, we need to set the
- # PEX_ROOT shell environment before forking the processes
- shell_env = os.environ.copy()
- shell_env["PEX_ROOT"] = os.path.join(os.path.abspath('.'), ".pex")
-
- # Instantiate the executor, bind it to signal handlers and launch it
- executor = HeronExecutor(sys.argv, shell_env)
- executor.initialize()
-
- start(executor)
-
if __name__ == "__main__":
- main()
+ cli()
diff --git a/heron/executor/tests/python/heron_executor_unittest.py
b/heron/executor/tests/python/heron_executor_unittest.py
index f097999..84481a1 100644
--- a/heron/executor/tests/python/heron_executor_unittest.py
+++ b/heron/executor/tests/python/heron_executor_unittest.py
@@ -19,6 +19,7 @@
# under the License.
'''heron executor unittest'''
+import argparse
import os
import socket
import unittest
@@ -26,8 +27,7 @@ import json
from pprint import pprint
-from heron.executor.src.python.heron_executor import ProcessInfo
-from heron.executor.src.python.heron_executor import HeronExecutor
+from heron.executor.src.python.heron_executor import cli, HeronExecutor,
ProcessInfo
from heron.proto.packing_plan_pb2 import PackingPlan
# pylint: disable=unused-argument
@@ -299,9 +299,9 @@ class HeronExecutorTest(unittest.TestCase):
("--metricscache-manager-mode", "cluster")
]
- args = ("%s=%s" % (arg[0], (str(arg[1]))) for arg in executor_args)
- command = "./heron-executor %s" % (" ".join(args))
- return command.split()
+ args = [f"{k}={v}" for k, v in executor_args]
+ ctx = cli.make_context('heron-executor', args)
+ return argparse.Namespace(**ctx.params)
def test_update_packing_plan(self):
self.executor_0.update_packing_plan(self.packing_plan_expected)