This is an automated email from the ASF dual-hosted git repository.

nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d24c94  Enable gperftool HEAPCHECK in integration test (#3201)
9d24c94 is described below

commit 9d24c943c8a5ec51c25c58a7953f9f9447510e97
Author: Xiaoyao Qian <[email protected]>
AuthorDate: Tue Mar 12 13:19:12 2019 -0700

    Enable gperftool HEAPCHECK in integration test (#3201)
    
    * enable heapcheck
    
    * refactor commands in heron-executor to carry per-process env variables
---
 .travis.yml                                        |   4 +-
 heron/executor/src/python/heron_executor.py        | 114 +++++++++++++++------
 .../tests/python/heron_executor_unittest.py        |   7 +-
 3 files changed, 92 insertions(+), 33 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 065aa74..b8c0c4e 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -22,9 +22,11 @@ addons:
       - pkg-config
       - zip
       - zlib1g-dev
+      - google-perftools
+      - libgoogle-perftools-dev
 
 env:
-  - CC=gcc-4.8 CXX=g++-4.8 CPP=cpp-4.8 CXXCPP=cpp-4.8
+  - CC=gcc-4.8 CXX=g++-4.8 CPP=cpp-4.8 CXXCPP=cpp-4.8 ENABLE_HEAPCHECK=1
 
 before_install:
   # download and install bazel
diff --git a/heron/executor/src/python/heron_executor.py 
b/heron/executor/src/python/heron_executor.py
index 39a8f98..63939ce 100755
--- a/heron/executor/src/python/heron_executor.py
+++ b/heron/executor/src/python/heron_executor.py
@@ -146,6 +146,37 @@ def stdout_log_fn(cmd):
   # Log the messages to stdout and strip off the newline because Log.info adds 
one automatically
   return lambda line: Log.info("%s stdout: %s", cmd, line.rstrip('\n'))
 
+class Command(object):
+  """
+  Command to run as a separate process using subprocess.POpen
+  :param cmd: command to run (as a list)
+  :param env: env variables for the process (as a map)
+  """
+  def __init__(self, cmd, env):
+    if isinstance(cmd, list):
+      self.cmd = cmd
+    else:
+      self.cmd = [cmd]
+    self.env = env
+
+  def extend(self, args):
+    self.cmd.extend(args)
+
+  def append(self, arg):
+    self.cmd.append(arg)
+
+  def copy(self):
+    return Command(list(self.cmd), self.env.copy() if self.env is not None 
else {})
+
+  def __repr__(self):
+    return str(self.cmd)
+
+  def __str__(self):
+    return ' '.join(self.cmd)
+
+  def __eq__(self, other):
+    return self.cmd == other.cmd
+
 class ProcessInfo(object):
   def __init__(self, process, name, command, attempts=1):
     """
@@ -159,7 +190,7 @@ class ProcessInfo(object):
     self.pid = process.pid
     self.name = name
     self.command = command
-    self.command_str = ' '.join(command) # convenience for unit tests
+    self.command_str = command.__str__() # convenience for unit tests
     self.attempts = attempts
 
   def increment_attempts(self):
@@ -352,7 +383,7 @@ class HeronExecutor(object):
     return parsed_args
 
   def run_command_or_exit(self, command):
-    if self._run_blocking_process(command, True, self.shell_env) != 0:
+    if self._run_blocking_process(command, True) != 0:
       Log.error("Failed to run command: %s. Exiting" % command)
       sys.exit(1)
 
@@ -363,10 +394,10 @@ class HeronExecutor(object):
     2. We don't initialize the logger (also something unit tests don't want) 
until after the
     constructor
     """
-    create_folders = 'mkdir -p %s' % self.log_dir
+    create_folders = Command('mkdir -p %s' % self.log_dir, self.shell_env)
     self.run_command_or_exit(create_folders)
 
-    chmod_logs_dir = 'chmod a+rx . && chmod a+x %s' % self.log_dir
+    chmod_logs_dir = Command('chmod a+rx . && chmod a+x %s' % self.log_dir, 
self.shell_env)
     self.run_command_or_exit(chmod_logs_dir)
 
     chmod_x_binaries = [self.tmaster_binary, self.stmgr_binary, 
self.heron_shell_binary]
@@ -374,7 +405,7 @@ class HeronExecutor(object):
     for binary in chmod_x_binaries:
       stat_result = os.stat(binary)[stat.ST_MODE]
       if not stat_result & stat.S_IXOTH:
-        chmod_binary = 'chmod +x %s' % binary
+        chmod_binary = Command('chmod +x %s' % binary, self.shell_env)
         self.run_command_or_exit(chmod_binary)
 
     # Log itself pid
@@ -432,7 +463,7 @@ class HeronExecutor(object):
                       '--override-config-file=' + self.override_config_file,
                       '--sink-config-file=' + sink_config_file]
 
-    return metricsmgr_cmd
+    return Command(metricsmgr_cmd, self.shell_env)
 
   def _get_metrics_cache_cmd(self):
     ''' get the command to start the metrics manager processes '''
@@ -474,7 +505,7 @@ class HeronExecutor(object):
                            "--role", self.role,
                            "--environment", self.environment]
 
-    return metricscachemgr_cmd
+    return Command(metricscachemgr_cmd, self.shell_env)
 
   def _get_healthmgr_cmd(self):
     ''' get the command to start the topology health manager processes '''
@@ -509,12 +540,12 @@ class HeronExecutor(object):
                      "--topology_name", self.topology_name,
                      "--metricsmgr_port", self.metrics_manager_port]
 
-    return healthmgr_cmd
+    return Command(healthmgr_cmd, self.shell_env)
 
   def _get_tmaster_processes(self):
     ''' get the command to start the tmaster processes '''
     retval = {}
-    tmaster_cmd = [
+    tmaster_cmd_lst = [
         self.tmaster_binary,
         '--topology_name=%s' % self.topology_name,
         '--topology_id=%s' % self.topology_id,
@@ -529,8 +560,16 @@ class HeronExecutor(object):
         '--metrics_sinks_yaml=%s' % self.metrics_sinks_config_file,
         '--metricsmgr_port=%s' % str(self.metrics_manager_port),
         '--ckptmgr_port=%s' % str(self.checkpoint_manager_port)]
-    retval["heron-tmaster"] = tmaster_cmd
 
+    tmaster_env = self.shell_env.copy() if self.shell_env is not None else {}
+    tmaster_cmd = Command(tmaster_cmd_lst, tmaster_env)
+    if os.environ.get('ENABLE_HEAPCHECK') is not None:
+      tmaster_cmd.env.update({
+          'LD_PRELOAD': "/usr/lib/libtcmalloc.so",
+          'HEAPCHECK': "normal"
+      })
+
+    retval["heron-tmaster"] = tmaster_cmd
 
     if self.metricscache_manager_mode.lower() != "disabled":
       retval["heron-metricscache"] = self._get_metrics_cache_cmd()
@@ -565,8 +604,7 @@ class HeronExecutor(object):
       if self.jvm_remote_debugger_ports:
         remote_debugger_port = self.jvm_remote_debugger_ports.pop()
 
-      instance_cmd = []
-      instance_cmd.append(self._get_jvm_instance_cmd())             # JVM 
command
+      instance_cmd = self._get_jvm_instance_cmd().copy()            # JVM 
command
       instance_cmd.extend(                                          # JVM 
options
           self._get_jvm_instance_options(
               instance_id, component_name, remote_debugger_port))
@@ -580,7 +618,7 @@ class HeronExecutor(object):
     return retval
 
   def _get_jvm_instance_cmd(self):
-    return os.path.join(self.heron_java_home, 'bin/java')
+    return Command(os.path.join(self.heron_java_home, 'bin/java'), 
self.shell_env)
 
   def _get_jvm_instance_options(self, instance_id, component_name, 
remote_debugger_port):
     code_cache_size_mb = 64
@@ -700,7 +738,7 @@ class HeronExecutor(object):
                       '--topology_pex=%s' % self.topology_binary_file,
                       '--max_ram=%s' % 
str(self.component_ram_map[component_name])]
 
-      retval[instance_id] = instance_cmd
+      retval[instance_id] = Command(instance_cmd, self.shell_env)
 
     return retval
 
@@ -727,7 +765,7 @@ class HeronExecutor(object):
           '--topology_binary=%s' % os.path.abspath(self.topology_binary_file)
       ]
 
-      retval[instance_id] = instance_cmd
+      retval[instance_id] = Command(instance_cmd, self.shell_env)
 
     return retval
 
@@ -748,7 +786,7 @@ class HeronExecutor(object):
       instance_id = "container_%s_%s_%d" % (str(self.shard), component_name, 
global_task_id)
       instance_info.append((instance_id, component_name, global_task_id, 
component_index))
 
-    stmgr_cmd = [
+    stmgr_cmd_lst = [
         self.stmgr_binary,
         '--topology_name=%s' % self.topology_name,
         '--topology_id=%s' % self.topology_id,
@@ -767,6 +805,15 @@ class HeronExecutor(object):
         '--ckptmgr_port=%s' % str(self.checkpoint_manager_port),
         '--ckptmgr_id=%s' % self.ckptmgr_ids[self.shard],
         '--metricscachemgr_mode=%s' % self.metricscache_manager_mode.lower()]
+
+    stmgr_env = self.shell_env.copy() if self.shell_env is not None else {}
+    stmgr_cmd = Command(stmgr_cmd_lst, stmgr_env)
+    if os.environ.get('ENABLE_HEAPCHECK') is not None:
+      stmgr_cmd.env.update({
+          'LD_PRELOAD': "/usr/lib/libtcmalloc.so",
+          'HEAPCHECK': "normal"
+      })
+
     retval[self.stmgr_ids[self.shard]] = stmgr_cmd
 
     # metricsmgr_metrics_sink_config_file = 'metrics_sinks.yaml'
@@ -829,7 +876,7 @@ class HeronExecutor(object):
                    '-f' + self.stateful_config_file,
                    '-g' + self.heron_internals_config_file]
     retval = {}
-    retval[self.ckptmgr_ids[self.shard]] = ckptmgr_cmd
+    retval[self.ckptmgr_ids[self.shard]] = Command(ckptmgr_cmd, self.shell_env)
 
     return retval
 
@@ -856,11 +903,11 @@ class HeronExecutor(object):
     """ Get a map from all daemon services' name to the command to start them 
"""
     retval = {}
 
-    retval[self.heron_shell_ids[self.shard]] = [
+    retval[self.heron_shell_ids[self.shard]] = Command([
         '%s' % self.heron_shell_binary,
         '--port=%s' % self.shell_port,
         '--log_file_prefix=%s/heron-shell-%s.log' % (self.log_dir, self.shard),
-        '--secret=%s' % self.topology_id]
+        '--secret=%s' % self.topology_id], self.shell_env)
 
     return retval
 
@@ -876,14 +923,13 @@ class HeronExecutor(object):
     proc.stream_process_stdout(process, stdout_log_fn(name))
     process.wait()
 
-  def _run_process(self, name, cmd, env_to_exec=None):
-    Log.info("Running %s process as %s" % (name, ' '.join(cmd)))
+  def _run_process(self, name, cmd):
+    Log.info("Running %s process as %s" % (name, cmd))
     try:
       # stderr is redirected to stdout so that it can more easily be logged. 
stderr has a max buffer
       # size and can cause the child process to deadlock if it fills up
-      process = subprocess.Popen(cmd, stdout=subprocess.PIPE, 
stderr=subprocess.STDOUT,
-                                 env=env_to_exec, bufsize=1)
-
+      process = subprocess.Popen(cmd.cmd, stdout=subprocess.PIPE, 
stderr=subprocess.STDOUT,
+                                 env=cmd.env, bufsize=1)
       proc.async_stream_process_stdout(process, stdout_log_fn(name))
     except Exception:
       Log.info("Exception running command %s", cmd)
@@ -891,16 +937,16 @@ class HeronExecutor(object):
 
     return process
 
-  def _run_blocking_process(self, cmd, is_shell=False, env_to_exec=None):
+  def _run_blocking_process(self, cmd, is_shell=False):
     Log.info("Running blocking process as %s" % cmd)
     try:
       # stderr is redirected to stdout so that it can more easily be logged. 
stderr has a max buffer
       # size and can cause the child process to deadlock if it fills up
-      process = subprocess.Popen(cmd, shell=is_shell, stdout=subprocess.PIPE,
-                                 stderr=subprocess.STDOUT, env=env_to_exec)
+      process = subprocess.Popen(cmd.cmd, shell=is_shell, 
stdout=subprocess.PIPE,
+                                 stderr=subprocess.STDOUT, env=cmd.env)
 
       # wait for termination
-      self._wait_process_std_out_err(cmd, process)
+      self._wait_process_std_out_err(cmd.cmd, process)
     except Exception:
       Log.info("Exception running command %s", cmd)
       traceback.print_exc()
@@ -916,7 +962,7 @@ class HeronExecutor(object):
           if process_info.name == command_name:
             del self.processes_to_monitor[process_info.pid]
             Log.info("Killing %s process with pid %d: %s" %
-                     (process_info.name, process_info.pid, ' '.join(command)))
+                     (process_info.name, process_info.pid, command))
             try:
               process_info.process.terminate()  # sends SIGTERM to process
             except OSError as e:
@@ -932,7 +978,7 @@ class HeronExecutor(object):
     processes_to_monitor = {}
     # First start all the processes
     for (name, command) in commands.items():
-      p = self._run_process(name, command, self.shell_env)
+      p = self._run_process(name, command)
       processes_to_monitor[p.pid] = ProcessInfo(p, name, command)
 
       # Log down the pid file
@@ -967,7 +1013,7 @@ class HeronExecutor(object):
               Log.info("%s exited too many times" % name)
               sys.exit(1)
             time.sleep(self.interval_between_runs)
-            p = self._run_process(name, command, self.shell_env)
+            p = self._run_process(name, command)
             del self.processes_to_monitor[pid]
             self.processes_to_monitor[p.pid] =\
               ProcessInfo(p, name, command, old_process_info.attempts + 1)
@@ -1120,6 +1166,12 @@ def setup(executor):
     1. Terminate all children processes
     """
     Log.info('Executor terminated; exiting all process in executor.')
+
+    # Kill child processes first and wait for log collection to finish
+    for pid in executor.processes_to_monitor.keys():
+      os.kill(pid, signal.SIGTERM)
+    time.sleep(5)
+
     # We would not wait or check whether process spawned dead or not
     os.killpg(0, signal.SIGTERM)
 
diff --git a/heron/executor/tests/python/heron_executor_unittest.py 
b/heron/executor/tests/python/heron_executor_unittest.py
index 7571d14..d1debb0 100644
--- a/heron/executor/tests/python/heron_executor_unittest.py
+++ b/heron/executor/tests/python/heron_executor_unittest.py
@@ -47,6 +47,11 @@ def get_test_heron_internal_yaml():
 INTERNAL_CONF_PATH, OVERRIDE_PATH = get_test_heron_internal_yaml()
 HOSTNAME = socket.gethostname()
 
+class CommandEncoder(json.JSONEncoder):
+  """Customized JSONEncoder that works with Command object"""
+  def default(self, o):
+    return o.cmd
+
 class MockPOpen(object):
   """fake subprocess.Popen object that we can use to mock processes and pids"""
   next_pid = 0
@@ -335,7 +340,7 @@ class HeronExecutorTest(unittest.TestCase):
         map((lambda process_info: (process_info.name, 
process_info.command.split(' '))),
             self.expected_processes_container_1))
 
-    current_json = json.dumps(current_commands, sort_keys=True).split(' ')
+    current_json = json.dumps(current_commands, sort_keys=True, 
cls=CommandEncoder).split(' ')
     temp_json = json.dumps(temp_dict, sort_keys=True).split(' ')
 
     print ("current_json: %s" % current_json)

Reply via email to