This is an automated email from the ASF dual-hosted git repository.
nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 9d24c94 Enable gperftool HEAPCHECK in integration test (#3201)
9d24c94 is described below
commit 9d24c943c8a5ec51c25c58a7953f9f9447510e97
Author: Xiaoyao Qian <[email protected]>
AuthorDate: Tue Mar 12 13:19:12 2019 -0700
Enable gperftool HEAPCHECK in integration test (#3201)
* enable heapcheck
* refactor commands in heron-executor to carry per-process env variables
---
.travis.yml | 4 +-
heron/executor/src/python/heron_executor.py | 114 +++++++++++++++------
.../tests/python/heron_executor_unittest.py | 7 +-
3 files changed, 92 insertions(+), 33 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index 065aa74..b8c0c4e 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -22,9 +22,11 @@ addons:
- pkg-config
- zip
- zlib1g-dev
+ - google-perftools
+ - libgoogle-perftools-dev
env:
- - CC=gcc-4.8 CXX=g++-4.8 CPP=cpp-4.8 CXXCPP=cpp-4.8
+ - CC=gcc-4.8 CXX=g++-4.8 CPP=cpp-4.8 CXXCPP=cpp-4.8 ENABLE_HEAPCHECK=1
before_install:
# download and install bazel
diff --git a/heron/executor/src/python/heron_executor.py
b/heron/executor/src/python/heron_executor.py
index 39a8f98..63939ce 100755
--- a/heron/executor/src/python/heron_executor.py
+++ b/heron/executor/src/python/heron_executor.py
@@ -146,6 +146,37 @@ def stdout_log_fn(cmd):
# Log the messages to stdout and strip off the newline because Log.info adds
one automatically
return lambda line: Log.info("%s stdout: %s", cmd, line.rstrip('\n'))
+class Command(object):
+ """
+ Command to run as a separate process using subprocess.POpen
+ :param cmd: command to run (as a list)
+ :param env: env variables for the process (as a map)
+ """
+ def __init__(self, cmd, env):
+ if isinstance(cmd, list):
+ self.cmd = cmd
+ else:
+ self.cmd = [cmd]
+ self.env = env
+
+ def extend(self, args):
+ self.cmd.extend(args)
+
+ def append(self, arg):
+ self.cmd.append(arg)
+
+ def copy(self):
+ return Command(list(self.cmd), self.env.copy() if self.env is not None
else {})
+
+ def __repr__(self):
+ return str(self.cmd)
+
+ def __str__(self):
+ return ' '.join(self.cmd)
+
+ def __eq__(self, other):
+ return self.cmd == other.cmd
+
class ProcessInfo(object):
def __init__(self, process, name, command, attempts=1):
"""
@@ -159,7 +190,7 @@ class ProcessInfo(object):
self.pid = process.pid
self.name = name
self.command = command
- self.command_str = ' '.join(command) # convenience for unit tests
+ self.command_str = command.__str__() # convenience for unit tests
self.attempts = attempts
def increment_attempts(self):
@@ -352,7 +383,7 @@ class HeronExecutor(object):
return parsed_args
def run_command_or_exit(self, command):
- if self._run_blocking_process(command, True, self.shell_env) != 0:
+ if self._run_blocking_process(command, True) != 0:
Log.error("Failed to run command: %s. Exiting" % command)
sys.exit(1)
@@ -363,10 +394,10 @@ class HeronExecutor(object):
2. We don't initialize the logger (also something unit tests don't want)
until after the
constructor
"""
- create_folders = 'mkdir -p %s' % self.log_dir
+ create_folders = Command('mkdir -p %s' % self.log_dir, self.shell_env)
self.run_command_or_exit(create_folders)
- chmod_logs_dir = 'chmod a+rx . && chmod a+x %s' % self.log_dir
+ chmod_logs_dir = Command('chmod a+rx . && chmod a+x %s' % self.log_dir,
self.shell_env)
self.run_command_or_exit(chmod_logs_dir)
chmod_x_binaries = [self.tmaster_binary, self.stmgr_binary,
self.heron_shell_binary]
@@ -374,7 +405,7 @@ class HeronExecutor(object):
for binary in chmod_x_binaries:
stat_result = os.stat(binary)[stat.ST_MODE]
if not stat_result & stat.S_IXOTH:
- chmod_binary = 'chmod +x %s' % binary
+ chmod_binary = Command('chmod +x %s' % binary, self.shell_env)
self.run_command_or_exit(chmod_binary)
# Log itself pid
@@ -432,7 +463,7 @@ class HeronExecutor(object):
'--override-config-file=' + self.override_config_file,
'--sink-config-file=' + sink_config_file]
- return metricsmgr_cmd
+ return Command(metricsmgr_cmd, self.shell_env)
def _get_metrics_cache_cmd(self):
''' get the command to start the metrics manager processes '''
@@ -474,7 +505,7 @@ class HeronExecutor(object):
"--role", self.role,
"--environment", self.environment]
- return metricscachemgr_cmd
+ return Command(metricscachemgr_cmd, self.shell_env)
def _get_healthmgr_cmd(self):
''' get the command to start the topology health manager processes '''
@@ -509,12 +540,12 @@ class HeronExecutor(object):
"--topology_name", self.topology_name,
"--metricsmgr_port", self.metrics_manager_port]
- return healthmgr_cmd
+ return Command(healthmgr_cmd, self.shell_env)
def _get_tmaster_processes(self):
''' get the command to start the tmaster processes '''
retval = {}
- tmaster_cmd = [
+ tmaster_cmd_lst = [
self.tmaster_binary,
'--topology_name=%s' % self.topology_name,
'--topology_id=%s' % self.topology_id,
@@ -529,8 +560,16 @@ class HeronExecutor(object):
'--metrics_sinks_yaml=%s' % self.metrics_sinks_config_file,
'--metricsmgr_port=%s' % str(self.metrics_manager_port),
'--ckptmgr_port=%s' % str(self.checkpoint_manager_port)]
- retval["heron-tmaster"] = tmaster_cmd
+ tmaster_env = self.shell_env.copy() if self.shell_env is not None else {}
+ tmaster_cmd = Command(tmaster_cmd_lst, tmaster_env)
+ if os.environ.get('ENABLE_HEAPCHECK') is not None:
+ tmaster_cmd.env.update({
+ 'LD_PRELOAD': "/usr/lib/libtcmalloc.so",
+ 'HEAPCHECK': "normal"
+ })
+
+ retval["heron-tmaster"] = tmaster_cmd
if self.metricscache_manager_mode.lower() != "disabled":
retval["heron-metricscache"] = self._get_metrics_cache_cmd()
@@ -565,8 +604,7 @@ class HeronExecutor(object):
if self.jvm_remote_debugger_ports:
remote_debugger_port = self.jvm_remote_debugger_ports.pop()
- instance_cmd = []
- instance_cmd.append(self._get_jvm_instance_cmd()) # JVM
command
+ instance_cmd = self._get_jvm_instance_cmd().copy() # JVM
command
instance_cmd.extend( # JVM
options
self._get_jvm_instance_options(
instance_id, component_name, remote_debugger_port))
@@ -580,7 +618,7 @@ class HeronExecutor(object):
return retval
def _get_jvm_instance_cmd(self):
- return os.path.join(self.heron_java_home, 'bin/java')
+ return Command(os.path.join(self.heron_java_home, 'bin/java'),
self.shell_env)
def _get_jvm_instance_options(self, instance_id, component_name,
remote_debugger_port):
code_cache_size_mb = 64
@@ -700,7 +738,7 @@ class HeronExecutor(object):
'--topology_pex=%s' % self.topology_binary_file,
'--max_ram=%s' %
str(self.component_ram_map[component_name])]
- retval[instance_id] = instance_cmd
+ retval[instance_id] = Command(instance_cmd, self.shell_env)
return retval
@@ -727,7 +765,7 @@ class HeronExecutor(object):
'--topology_binary=%s' % os.path.abspath(self.topology_binary_file)
]
- retval[instance_id] = instance_cmd
+ retval[instance_id] = Command(instance_cmd, self.shell_env)
return retval
@@ -748,7 +786,7 @@ class HeronExecutor(object):
instance_id = "container_%s_%s_%d" % (str(self.shard), component_name,
global_task_id)
instance_info.append((instance_id, component_name, global_task_id,
component_index))
- stmgr_cmd = [
+ stmgr_cmd_lst = [
self.stmgr_binary,
'--topology_name=%s' % self.topology_name,
'--topology_id=%s' % self.topology_id,
@@ -767,6 +805,15 @@ class HeronExecutor(object):
'--ckptmgr_port=%s' % str(self.checkpoint_manager_port),
'--ckptmgr_id=%s' % self.ckptmgr_ids[self.shard],
'--metricscachemgr_mode=%s' % self.metricscache_manager_mode.lower()]
+
+ stmgr_env = self.shell_env.copy() if self.shell_env is not None else {}
+ stmgr_cmd = Command(stmgr_cmd_lst, stmgr_env)
+ if os.environ.get('ENABLE_HEAPCHECK') is not None:
+ stmgr_cmd.env.update({
+ 'LD_PRELOAD': "/usr/lib/libtcmalloc.so",
+ 'HEAPCHECK': "normal"
+ })
+
retval[self.stmgr_ids[self.shard]] = stmgr_cmd
# metricsmgr_metrics_sink_config_file = 'metrics_sinks.yaml'
@@ -829,7 +876,7 @@ class HeronExecutor(object):
'-f' + self.stateful_config_file,
'-g' + self.heron_internals_config_file]
retval = {}
- retval[self.ckptmgr_ids[self.shard]] = ckptmgr_cmd
+ retval[self.ckptmgr_ids[self.shard]] = Command(ckptmgr_cmd, self.shell_env)
return retval
@@ -856,11 +903,11 @@ class HeronExecutor(object):
""" Get a map from all daemon services' name to the command to start them
"""
retval = {}
- retval[self.heron_shell_ids[self.shard]] = [
+ retval[self.heron_shell_ids[self.shard]] = Command([
'%s' % self.heron_shell_binary,
'--port=%s' % self.shell_port,
'--log_file_prefix=%s/heron-shell-%s.log' % (self.log_dir, self.shard),
- '--secret=%s' % self.topology_id]
+ '--secret=%s' % self.topology_id], self.shell_env)
return retval
@@ -876,14 +923,13 @@ class HeronExecutor(object):
proc.stream_process_stdout(process, stdout_log_fn(name))
process.wait()
- def _run_process(self, name, cmd, env_to_exec=None):
- Log.info("Running %s process as %s" % (name, ' '.join(cmd)))
+ def _run_process(self, name, cmd):
+ Log.info("Running %s process as %s" % (name, cmd))
try:
# stderr is redirected to stdout so that it can more easily be logged.
stderr has a max buffer
# size and can cause the child process to deadlock if it fills up
- process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
- env=env_to_exec, bufsize=1)
-
+ process = subprocess.Popen(cmd.cmd, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
+ env=cmd.env, bufsize=1)
proc.async_stream_process_stdout(process, stdout_log_fn(name))
except Exception:
Log.info("Exception running command %s", cmd)
@@ -891,16 +937,16 @@ class HeronExecutor(object):
return process
- def _run_blocking_process(self, cmd, is_shell=False, env_to_exec=None):
+ def _run_blocking_process(self, cmd, is_shell=False):
Log.info("Running blocking process as %s" % cmd)
try:
# stderr is redirected to stdout so that it can more easily be logged.
stderr has a max buffer
# size and can cause the child process to deadlock if it fills up
- process = subprocess.Popen(cmd, shell=is_shell, stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT, env=env_to_exec)
+ process = subprocess.Popen(cmd.cmd, shell=is_shell,
stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT, env=cmd.env)
# wait for termination
- self._wait_process_std_out_err(cmd, process)
+ self._wait_process_std_out_err(cmd.cmd, process)
except Exception:
Log.info("Exception running command %s", cmd)
traceback.print_exc()
@@ -916,7 +962,7 @@ class HeronExecutor(object):
if process_info.name == command_name:
del self.processes_to_monitor[process_info.pid]
Log.info("Killing %s process with pid %d: %s" %
- (process_info.name, process_info.pid, ' '.join(command)))
+ (process_info.name, process_info.pid, command))
try:
process_info.process.terminate() # sends SIGTERM to process
except OSError as e:
@@ -932,7 +978,7 @@ class HeronExecutor(object):
processes_to_monitor = {}
# First start all the processes
for (name, command) in commands.items():
- p = self._run_process(name, command, self.shell_env)
+ p = self._run_process(name, command)
processes_to_monitor[p.pid] = ProcessInfo(p, name, command)
# Log down the pid file
@@ -967,7 +1013,7 @@ class HeronExecutor(object):
Log.info("%s exited too many times" % name)
sys.exit(1)
time.sleep(self.interval_between_runs)
- p = self._run_process(name, command, self.shell_env)
+ p = self._run_process(name, command)
del self.processes_to_monitor[pid]
self.processes_to_monitor[p.pid] =\
ProcessInfo(p, name, command, old_process_info.attempts + 1)
@@ -1120,6 +1166,12 @@ def setup(executor):
1. Terminate all children processes
"""
Log.info('Executor terminated; exiting all process in executor.')
+
+ # Kill child processes first and wait for log collection to finish
+ for pid in executor.processes_to_monitor.keys():
+ os.kill(pid, signal.SIGTERM)
+ time.sleep(5)
+
# We would not wait or check whether process spawned dead or not
os.killpg(0, signal.SIGTERM)
diff --git a/heron/executor/tests/python/heron_executor_unittest.py
b/heron/executor/tests/python/heron_executor_unittest.py
index 7571d14..d1debb0 100644
--- a/heron/executor/tests/python/heron_executor_unittest.py
+++ b/heron/executor/tests/python/heron_executor_unittest.py
@@ -47,6 +47,11 @@ def get_test_heron_internal_yaml():
INTERNAL_CONF_PATH, OVERRIDE_PATH = get_test_heron_internal_yaml()
HOSTNAME = socket.gethostname()
+class CommandEncoder(json.JSONEncoder):
+ """Customized JSONEncoder that works with Command object"""
+ def default(self, o):
+ return o.cmd
+
class MockPOpen(object):
"""fake subprocess.Popen object that we can use to mock processes and pids"""
next_pid = 0
@@ -335,7 +340,7 @@ class HeronExecutorTest(unittest.TestCase):
map((lambda process_info: (process_info.name,
process_info.command.split(' '))),
self.expected_processes_container_1))
- current_json = json.dumps(current_commands, sort_keys=True).split(' ')
+ current_json = json.dumps(current_commands, sort_keys=True,
cls=CommandEncoder).split(' ')
temp_json = json.dumps(temp_dict, sort_keys=True).split(' ')
print ("current_json: %s" % current_json)