Repository: incubator-slider
Updated Branches:
  refs/heads/develop 8234cbe4e -> 4032999f3


SLIDER-1264 killing another processes by slider-agent, when commandScript timed 
out


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/4032999f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/4032999f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/4032999f

Branch: refs/heads/develop
Commit: 4032999f35db4877b6b8ffc5e97a59837e22365b
Parents: 8234cbe
Author: kwnam <kw...@apache.org>
Authored: Sat Mar 24 12:43:32 2018 +0900
Committer: kwnam <kw...@apache.org>
Committed: Sat Mar 24 12:43:32 2018 +0900

----------------------------------------------------------------------
 .../src/main/python/agent/process_utils.py      | 100 +++++++++
 slider-agent/src/main/python/agent/shell.py     |  66 +++---
 .../src/test/python/agent/TestProcessUtils.py   | 221 +++++++++++++++++++
 slider-agent/src/test/python/agent/TestShell.py |   3 -
 4 files changed, 350 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4032999f/slider-agent/src/main/python/agent/process_utils.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/agent/process_utils.py 
b/slider-agent/src/main/python/agent/process_utils.py
new file mode 100644
index 0000000..50ffd02
--- /dev/null
+++ b/slider-agent/src/main/python/agent/process_utils.py
@@ -0,0 +1,100 @@
+# !/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import subprocess
+import time
+
+check_time_delay = 0.1  # seconds between checks of process killed
+
+
+def get_children(pid):
+  PSCMD = ["ps", "-o", "pid", "--no-headers", "--ppid", str(pid)]
+  ps_process = subprocess.Popen(PSCMD, stdout=subprocess.PIPE, 
stderr=subprocess.PIPE)
+  stdout, stderr = ps_process.communicate()
+  if ps_process.returncode != 0:
+    return []
+  return stdout.split()
+
+
+def get_flat_process_tree(pid):
+  """
+  :param pid: process id of parent process
+  :return: list of child process pids. Resulting list also includes parent pid
+  """
+  res = [str(pid)]
+  children = get_children(pid)
+  for child in children:
+    res += get_flat_process_tree(child)
+  return res
+
+
+def kill_pids(pids, signal):
+  from resource_management.core.exceptions import Fail
+  CMD = ["kill", "-" + str(signal)]
+  CMD.extend(pids)
+  process = subprocess.Popen(CMD, stdout=subprocess.PIPE, 
stderr=subprocess.PIPE)
+  stdout, stderr = process.communicate()
+  if process.returncode != 0:
+    raise Fail("Unable to kill PIDs {0} : {1}".format(str(pids),stderr))
+
+
+def get_command_by_pid(pid):
+  CMD = ["ps", "-p", str(pid), "-o", "command", "--no-headers"]
+  process = subprocess.Popen(CMD, stdout=subprocess.PIPE, 
stderr=subprocess.PIPE)
+  stdout, stderr = process.communicate()
+  if process.returncode != 0:
+    return "NOT_FOUND[%s]" % pid
+  return stdout
+
+
+def wait_for_entire_process_tree_death(pids):
+  for child in pids:
+    wait_for_process_death(child)
+
+
+def wait_for_process_death(pid, timeout=5):
+  start = time.time()
+  current_time = start
+  while is_process_running(pid) and current_time < start + timeout:
+    time.sleep(check_time_delay)
+    current_time = time.time()
+
+
+def is_process_running(pid):
+  CMD = ["ps", "-p", str(pid), "-o", "pid", "--no-headers"]
+  process = subprocess.Popen(CMD, stdout=subprocess.PIPE, 
stderr=subprocess.PIPE)
+  stdout, stderr = process.communicate()
+  if process.returncode != 0:
+    return False
+  return pid in stdout
+
+
+
+def get_processes_running(process_pids):
+  """
+  Checks what processes are still running
+  :param process_pids: list of process pids
+  :return: list of pids for processes that are still running
+  """
+  result = []
+  for pid in process_pids:
+    if is_process_running(pid):
+      result.append(pid)
+  return result

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4032999f/slider-agent/src/main/python/agent/shell.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/agent/shell.py 
b/slider-agent/src/main/python/agent/shell.py
index 446dde9..f22d535 100644
--- a/slider-agent/src/main/python/agent/shell.py
+++ b/slider-agent/src/main/python/agent/shell.py
@@ -19,16 +19,13 @@ limitations under the License.
 '''
 
 import logging
-import subprocess
 import os
-import tempfile
 import signal
-import sys
+import subprocess
 import threading
-import time
-import traceback
-import pprint
 import platform
+from process_utils import get_flat_process_tree, kill_pids, 
wait_for_entire_process_tree_death, \
+  get_processes_running, get_command_by_pid
 
 if platform.system() != "Windows":
   try:
@@ -42,7 +39,6 @@ logger = logging.getLogger()
 
 shellRunner = None
 threadLocal = threading.local()
-gracefull_kill_delay = 5 # seconds between SIGTERM and SIGKILL
 tempFiles = [] 
 def noteTempFile(filename):
   tempFiles.append(filename)
@@ -92,40 +88,36 @@ class shellRunnerWindows:
     logger.debug("Exitcode for %s is %d" % (cmd, code))
     return _dict_to_object({'exitCode': code, 'output': out, 'error': err})
 
-
 #linux specific code
 def _kill_process_with_children_linux(parent_pid):
-  def kill_tree_function(pid, signal):
-    '''
+    """
     Kills process tree starting from a given pid.
-    '''
-    # The command below starts 'ps' linux utility and then parses it's
-    # output using 'awk'. AWK recursively extracts PIDs of all children of
-    # a given PID and then passes list of "kill -<SIGNAL> PID" commands to 'sh'
-    # shell.
-    CMD = """ps xf | awk -v PID=""" + str(pid) + \
-          """ ' $1 == PID { P = $1; next } P && /_/ { P = P " " $1;""" + \
-          """K=P } P && !/_/ { P="" }  END { print "kill -""" \
-          + str(signal) + """ "K }' | sh """
-    process = subprocess.Popen(CMD, stdout=subprocess.PIPE,
-                               stderr=subprocess.PIPE, shell=True)
-    process.communicate()
-  _run_kill_function(kill_tree_function, parent_pid)
-
-def _run_kill_function(kill_function, pid):
-  try:
-    kill_function(pid, signal.SIGTERM)
-  except Exception, e:
-    logger.warn("Failed to kill PID %d" % (pid))
-    logger.warn("Reported error: " + repr(e))
-
-  time.sleep(gracefull_kill_delay)
+    :param parent_pid: head of tree
+    :param graceful_kill_delays: map <command name, custom delay between 
SIGTERM and SIGKILL>
+    :return:
+    """
+  
+    pids = get_flat_process_tree(parent_pid)
+    logger.info("Process tree: %s" % ','.join(pids))
+    try:
+      kill_pids(pids, signal.SIGTERM)
+    except Exception, e:
+      logger.warn("Failed to kill PID %d" % parent_pid)
+      logger.warn("Reported error: " + repr(e))
+  
+    wait_for_entire_process_tree_death(pids)
+  
+    try:
+      running_processes = get_processes_running(pids)
+      if running_processes:
+        process_names = map(lambda x: get_command_by_pid(x),  
running_processes)
+        logger.warn("These PIDs %s did not die after SIGTERM, sending SIGKILL. 
Exact commands to be killed:\n %s" %
+                    (", ".join(running_processes), "\n".join(process_names)))
+        kill_pids(running_processes, signal.SIGKILL)
+    except Exception, e:
+      logger.error("Failed to send SIGKILL to PID %d. Process exited?" % 
parent_pid)
+      logger.error("Reported error: " + repr(e))
 
-  try:
-    kill_function(pid, signal.SIGKILL)
-  except Exception, e:
-    logger.error("Failed to send SIGKILL to PID %d. Process exited?" % (pid))
-    logger.error("Reported error: " + repr(e))
 
 def _changeUid():
   try:

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4032999f/slider-agent/src/test/python/agent/TestProcessUtils.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/test/python/agent/TestProcessUtils.py 
b/slider-agent/src/test/python/agent/TestProcessUtils.py
new file mode 100644
index 0000000..d3c708e
--- /dev/null
+++ b/slider-agent/src/test/python/agent/TestProcessUtils.py
@@ -0,0 +1,221 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import unittest
+import signal
+import subprocess, time
+from mock.mock import patch, MagicMock, PropertyMock, call
+from agent import process_utils
+
+process_tree = {"111": "222\n 22",
+                "222": "333\n 33",
+                "22": "44\n 444",}
+
+
+class TestProcessUtils(unittest.TestCase):
+  @patch("subprocess.Popen")
+  def test_kill(self, popen_mock):
+    process_mock = MagicMock()
+    process_mock.communicate.return_value = (None, None)
+    returncode_mock = PropertyMock()
+    returncode_mock.return_value = 0
+    type(process_mock).returncode = returncode_mock
+    popen_mock.return_value = process_mock
+    process_utils.kill_pids(["12321113230", "2312415453"], signal.SIGTERM)
+    expected = [call(['kill', '-15', '12321113230', '2312415453'], stderr=-1, 
stdout=-1)]
+    self.assertEquals(popen_mock.call_args_list, expected)
+
+  @patch("subprocess.Popen")
+  def test_get_children(self, popen_mock):
+
+    process_mock = MagicMock()
+    process_mock.communicate.return_value = ("123 \n \n 321\n", None)
+    popen_mock.return_value = process_mock
+    returncode_mock = PropertyMock()
+    returncode_mock.return_value = 0
+    type(process_mock).returncode = returncode_mock
+    result = process_utils.get_children("2312415453")
+
+    self.assertEquals(result, ["123", "321"])
+
+    expected = [
+      call(['ps', '-o', 'pid', '--no-headers', '--ppid', '2312415453'], 
stderr=subprocess.PIPE, stdout=subprocess.PIPE)]
+    self.assertEquals(popen_mock.call_args_list, expected)
+
+  @patch("subprocess.Popen")
+  def test_get_flat_process_tree(self, popen_mock):
+    def side_effect(*args, **kwargs):
+      process_mock = MagicMock()
+      returncode_mock = PropertyMock()
+      returncode_mock.return_value = 0
+      type(process_mock).returncode = returncode_mock
+      if args[0][5] in process_tree.keys():
+        process_mock.communicate.return_value = (process_tree[args[0][5]], 
None)
+      else:
+        process_mock.communicate.return_value = ("", None)
+      return process_mock
+
+    popen_mock.side_effect = side_effect
+    result = process_utils.get_flat_process_tree("111")
+    self.assertEquals(result, ['111', '222', '333', '33', '22', '44', '444'])
+
+    expected = [call(['ps', '-o', 'pid', '--no-headers', '--ppid', '111'], 
stderr=-1, stdout=-1),
+                call(['ps', '-o', 'pid', '--no-headers', '--ppid', '222'], 
stderr=-1, stdout=-1),
+                call(['ps', '-o', 'pid', '--no-headers', '--ppid', '333'], 
stderr=-1, stdout=-1),
+                call(['ps', '-o', 'pid', '--no-headers', '--ppid', '33'], 
stderr=-1, stdout=-1),
+                call(['ps', '-o', 'pid', '--no-headers', '--ppid', '22'], 
stderr=-1, stdout=-1),
+                call(['ps', '-o', 'pid', '--no-headers', '--ppid', '44'], 
stderr=-1, stdout=-1),
+                call(['ps', '-o', 'pid', '--no-headers', '--ppid', '444'], 
stderr=-1, stdout=-1)]
+    self.assertEquals(popen_mock.call_args_list, expected)
+
+  @patch("subprocess.Popen")
+  def test_get_command_by_pid(self, popen_mock):
+
+    process_mock = MagicMock()
+    process_mock.communicate.return_value = ("yum something", None)
+    returncode_mock = PropertyMock()
+    returncode_mock.return_value = 0
+    type(process_mock).returncode = returncode_mock
+    popen_mock.return_value = process_mock
+
+    result = process_utils.get_command_by_pid("2312415453")
+
+    self.assertEquals(result, "yum something")
+
+    expected = [call(['ps', '-p', '2312415453', '-o', 'command', 
'--no-headers'], stderr=-1, stdout=-1)]
+    self.assertEquals(popen_mock.call_args_list, expected)
+
+  @patch("subprocess.Popen")
+  def test_get_command_by_pid_not_exist(self, popen_mock):
+
+    process_mock = MagicMock()
+    process_mock.communicate.return_value = ("", None)
+    returncode_mock = PropertyMock()
+    returncode_mock.return_value = 1
+    type(process_mock).returncode = returncode_mock
+    popen_mock.return_value = process_mock
+
+    result = process_utils.get_command_by_pid("2312415453")
+
+    self.assertEquals(result, "NOT_FOUND[2312415453]")
+
+    expected = [call(['ps', '-p', '2312415453', '-o', 'command', 
'--no-headers'], stderr=-1, stdout=-1)]
+    self.assertEquals(popen_mock.call_args_list, expected)
+
+  @patch("subprocess.Popen")
+  def test_is_process_running(self, popen_mock):
+
+    process_mock = MagicMock()
+    process_mock.communicate.return_value = ("2312415453", None)
+    returncode_mock = PropertyMock()
+    returncode_mock.return_value = 0
+    type(process_mock).returncode = returncode_mock
+    popen_mock.return_value = process_mock
+
+    result = process_utils.is_process_running("2312415453")
+
+    self.assertEquals(result, True)
+
+    expected = [call(['ps', '-p', '2312415453', '-o', 'pid', '--no-headers'], 
stderr=-1, stdout=-1)]
+    self.assertEquals(popen_mock.call_args_list, expected)
+
+  @patch("subprocess.Popen")
+  def test_is_process_not_running(self, popen_mock):
+
+    process_mock = MagicMock()
+    process_mock.communicate.return_value = ("", None)
+    returncode_mock = PropertyMock()
+    returncode_mock.return_value = 1
+    type(process_mock).returncode = returncode_mock
+    popen_mock.return_value = process_mock
+
+    result = process_utils.is_process_running("2312415453")
+
+    self.assertEquals(result, False)
+
+    expected = [call(['ps', '-p', '2312415453', '-o', 'pid', '--no-headers'], 
stderr=-1, stdout=-1)]
+    self.assertEquals(popen_mock.call_args_list, expected)
+
+  @patch("subprocess.Popen")
+  def test_get_processes_running(self, popen_mock):
+    def side_effect(*args, **kwargs):
+      process_mock = MagicMock()
+      returncode_mock = PropertyMock()
+      if args[0][2] == "4321":
+        returncode_mock.return_value = 0
+        process_mock.communicate.return_value = ("4321", None)
+      else:
+        returncode_mock.return_value = 1
+        process_mock.communicate.return_value = (None, None)
+      type(process_mock).returncode = returncode_mock
+      return process_mock
+
+    popen_mock.side_effect = side_effect
+
+    result = process_utils.get_processes_running(["1234", "4321"])
+
+    self.assertEquals(result, ["4321"])
+
+    expected = [call(['ps', '-p', '1234', '-o', 'pid', '--no-headers'], 
stderr=-1, stdout=-1),
+                call(['ps', '-p', '4321', '-o', 'pid', '--no-headers'], 
stderr=-1, stdout=-1)]
+    self.assertEquals(popen_mock.call_args_list, expected)
+
+  @patch("time.sleep")
+  @patch("subprocess.Popen")
+  def test_wait_for_process_death(self, popen_mock, sleep_mock):
+
+    process_mock = MagicMock()
+    process_mock.communicate.side_effect = [("4321", None),("4321", 
None),(None, None)]
+    returncode_mock = PropertyMock()
+    returncode_mock.side_effect = [0, 0, 1]
+    type(process_mock).returncode = returncode_mock
+    popen_mock.return_value = process_mock
+
+    process_utils.wait_for_process_death("4321")
+
+    expected = [call(['ps', '-p', '4321', '-o', 'pid', '--no-headers'], 
stderr=-1, stdout=-1),
+                call(['ps', '-p', '4321', '-o', 'pid', '--no-headers'], 
stderr=-1, stdout=-1),
+                call(['ps', '-p', '4321', '-o', 'pid', '--no-headers'], 
stderr=-1, stdout=-1)]
+    self.assertEquals(popen_mock.call_args_list, expected)
+    expected = [call(0.1), call(0.1)]
+    self.assertEquals(sleep_mock.call_args_list, expected)
+
+  @patch("time.sleep")
+  @patch("subprocess.Popen")
+  def test_wait_for_entire_process_tree_death(self, popen_mock, sleep_mock):
+
+    process_mock = MagicMock()
+    process_mock.communicate.side_effect = [("1234", None), (None, None), 
("4321", None), ("4321", None), (None, None)]
+    returncode_mock = PropertyMock()
+    returncode_mock.side_effect = [0, 1, 0, 0, 1]
+    type(process_mock).returncode = returncode_mock
+    popen_mock.return_value = process_mock
+
+    process_utils.wait_for_entire_process_tree_death(["1234", "4321"])
+
+    expected = [call(['ps', '-p', '1234', '-o', 'pid', '--no-headers'], 
stderr=-1, stdout=-1),
+                call(['ps', '-p', '1234', '-o', 'pid', '--no-headers'], 
stderr=-1, stdout=-1),
+                call(['ps', '-p', '4321', '-o', 'pid', '--no-headers'], 
stderr=-1, stdout=-1),
+                call(['ps', '-p', '4321', '-o', 'pid', '--no-headers'], 
stderr=-1, stdout=-1),
+                call(['ps', '-p', '4321', '-o', 'pid', '--no-headers'], 
stderr=-1, stdout=-1)]
+    self.assertEquals(popen_mock.call_args_list, expected)
+    expected = [call(0.1), call(0.1), call(0.1)]
+    self.assertEquals(sleep_mock.call_args_list, expected)

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4032999f/slider-agent/src/test/python/agent/TestShell.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/test/python/agent/TestShell.py 
b/slider-agent/src/test/python/agent/TestShell.py
index 8caed7b..a707e09 100644
--- a/slider-agent/src/test/python/agent/TestShell.py
+++ b/slider-agent/src/test/python/agent/TestShell.py
@@ -50,8 +50,6 @@ class TestShell(unittest.TestCase):
       return
 
     if _platform == "linux" or _platform == "linux2": # Test is Linux-specific
-      gracefull_kill_delay_old = shell.gracefull_kill_delay
-      shell.gracefull_kill_delay = 0.1
       sleep_cmd = "sleep 10"
       test_cmd = """ (({0}) & ({0} & {0})) """.format(sleep_cmd)
       # Starting process tree (multiple process groups)
@@ -69,7 +67,6 @@ class TestShell(unittest.TestCase):
       ps_process = subprocess.Popen(ps_cmd, stderr=subprocess.PIPE, 
stdout=subprocess.PIPE, shell=True)
       (out, err) = ps_process.communicate()
       self.assertFalse(sleep_cmd in out)
-      shell.gracefull_kill_delay = gracefull_kill_delay_old
     else:
       # Do not run under other systems
       pass

Reply via email to