This is an automated email from the ASF dual-hosted git repository.

hapylestat pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/ambari.git

commit 2dbb4c07f4409be16cc4395476ab28b369de76aa
Author: Reishin <[email protected]>
AuthorDate: Wed Jan 31 15:56:51 2018 +0200

    [AMBARI-22888] Cancel operation during package deployment causing 
repository manager to be broken (dgrinenko)
---
 .../src/test/python/ambari_agent/TestShell.py      | 174 ++++++++++++++-------
 .../src/main/python/ambari_commons/shell.py        | 116 ++++++++++----
 2 files changed, 201 insertions(+), 89 deletions(-)

diff --git a/ambari-agent/src/test/python/ambari_agent/TestShell.py 
b/ambari-agent/src/test/python/ambari_agent/TestShell.py
index c6bfa01..6a0538a 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestShell.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestShell.py
@@ -1,7 +1,7 @@
 #!/usr/bin/env python
 # -*- coding: utf-8 -*-
 
-'''
+"""
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
 distributed with this work for additional information
@@ -17,66 +17,128 @@ distributed under the License is distributed on an "AS IS" 
BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-'''
+"""
+from contextlib import contextmanager
 
-from ambari_agent import main
-main.MEMORY_LEAK_DEBUG_FILEPATH = "/tmp/memory_leak_debug.out"
-import os
 import unittest
-import tempfile
+import signal
 from mock.mock import patch, MagicMock, call
-from ambari_agent.AmbariConfig import AmbariConfig
 from ambari_commons import shell
-from ambari_commons.shell import shellRunner
-from sys import platform as _platform
-from only_for_platform import not_for_platform, PLATFORM_WINDOWS
-import subprocess, time
+from ambari_commons import OSCheck
+from StringIO import StringIO
+
+ROOT_PID = 10
+ROOT_PID_CHILDRENS = [10, 11, 12, 13]
+shell.logger = MagicMock()  # suppress any log output
+
+__proc_fs = {
+  "/proc/10/task/10/children": "11 12",
+  "/proc/10/comm": "a",
+  "/proc/10/cmdline": "",
+
+  "/proc/11/task/11/children": "13",
+  "/proc/11/comm": "b",
+  "/proc/11/cmdline": "",
+
+  "/proc/12/task/12/children": "",
+  "/proc/12/comm": "c",
+  "/proc/12/cmdline": "",
+
+  "/proc/13/task/13/children": "",
+  "/proc/13/comm": "d",
+  "/proc/13/cmdline": ""
+}
+
+__proc_fs_yum = {
+  "/proc/10/task/10/children": "11",
+  "/proc/10/comm": "a",
+  "/proc/10/cmdline": "",
+
+  "/proc/11/task/11/children": "",
+  "/proc/11/comm": "yum",
+  "/proc/11/cmdline": "yum install something"
+}
+
+
+class FakeSignals(object):
+  SIGTERM = signal.SIG_IGN
+  SIGKILL = signal.SIG_IGN
+
+
+@contextmanager
+def _open_mock(path, open_mode):
+  if path in __proc_fs:
+    yield StringIO(__proc_fs[path])
+  else:
+    yield StringIO("")
+
+
+@contextmanager
+def _open_mock_yum(path, open_mode):
+  if path in __proc_fs:
+    yield StringIO(__proc_fs_yum[path])
+  else:
+    yield StringIO("")
+
 
-@not_for_platform(PLATFORM_WINDOWS)
 class TestShell(unittest.TestCase):
 
+  @patch("__builtin__.open", new=MagicMock(side_effect=_open_mock))
+  def test_get_all_children(self):
+
+    pid_list = [item[0] for item in shell.get_all_childrens(ROOT_PID)]
+
+    self.assertEquals(len(ROOT_PID_CHILDRENS), len(pid_list))
+    self.assertEquals(ROOT_PID, pid_list[0])
+
+    for i in ROOT_PID_CHILDRENS:
+      self.assertEquals(True, i in pid_list)
+
+  @patch("__builtin__.open", new=MagicMock(side_effect=_open_mock))
+  @patch.object(OSCheck, "get_os_family", new=MagicMock(return_value="redhat"))
+  @patch.object(shell, "signal", new_callable=FakeSignals)
+  @patch.object(shell, "is_pid_life")
+  @patch("os.kill")
+  def test_kill_process_with_children(self, os_kill_mock, is_pid_life_mock, 
fake_signals):
+    pid_list = [item[0] for item in shell.get_all_childrens(ROOT_PID)]
+    reverse_pid_list = sorted(pid_list, reverse=True)
+    shell.gracefull_kill_delay = 0.1
+    is_pid_life_clean_kill = [True] * len(pid_list) + [False] * len(pid_list)
+    is_pid_life_not_clean_kill = [True] * (len(pid_list) * 2)
+
+    is_pid_life_mock.side_effect = is_pid_life_clean_kill
+    shell.kill_process_with_children(ROOT_PID)
+
+    # test clean pid by SIGTERM
+    os_kill_pids = [item[0][0] for item in os_kill_mock.call_args_list]
+    self.assertEquals(len(os_kill_pids), len(pid_list))
+    self.assertEquals(reverse_pid_list, os_kill_pids)
+
+    os_kill_mock.reset_mock()
+    is_pid_life_mock.reset_mock()
+
+    is_pid_life_mock.side_effect = is_pid_life_not_clean_kill
+    shell.kill_process_with_children(ROOT_PID)
+
+    # test clean pid by SIGKILL
+    os_kill_pids = [item[0][0] for item in os_kill_mock.call_args_list]
+    self.assertEquals(len(os_kill_pids), len(pid_list)*2)
+    self.assertEquals(reverse_pid_list + reverse_pid_list, os_kill_pids)
+
+  @patch("__builtin__.open", new=MagicMock(side_effect=_open_mock_yum))
+  @patch.object(OSCheck, "get_os_family", new=MagicMock(return_value="redhat"))
+  @patch.object(shell, "signal", new_callable=FakeSignals)
+  @patch.object(shell, "is_pid_life")
+  @patch("os.kill")
+  def test_kill_process_with_children_except_yum(self, os_kill_mock, 
is_pid_life_mock, fake_signals):
+    shell.gracefull_kill_delay = 0.1
+    is_pid_life_clean_kill = [True, False, True, False]  # used here only 
first pair
+
+    is_pid_life_mock.side_effect = is_pid_life_clean_kill
+    shell.kill_process_with_children(ROOT_PID)
+
+    # test clean pid by SIGTERM
+    os_kill_pids = [item[0][0] for item in os_kill_mock.call_args_list]
+    self.assertEquals(len(os_kill_pids), 1)
+    self.assertEquals([10], os_kill_pids)
 
-  @patch("os.setuid")
-  def test_changeUid(self, os_setUIDMock):
-    shell.threadLocal.uid = 9999
-    shell.changeUid()
-    self.assertTrue(os_setUIDMock.called)
-
-
-  @patch("pwd.getpwnam")
-  def test_shellRunner_run(self, getpwnamMock):
-    sh = shellRunner()
-    result = sh.run(['echo'])
-    self.assertEquals(result['exitCode'], 0)
-    self.assertEquals(result['error'], '')
-
-    getpwnamMock.return_value = [os.getuid(), os.getuid(), os.getuid()]
-    result = sh.run(['echo'], 'non_exist_user_name')
-    self.assertEquals(result['exitCode'], 0)
-    self.assertEquals(result['error'], '')
-
-  def test_kill_process_with_children(self):
-    if _platform == "linux" or _platform == "linux2": # Test is Linux-specific
-      gracefull_kill_delay_old = shell.gracefull_kill_delay
-      shell.gracefull_kill_delay = 0.1
-      sleep_cmd = "sleep 314159265"
-      test_cmd = """ (({0}) & ({0} & {0})) """.format(sleep_cmd)
-      # Starting process tree (multiple process groups)
-      test_process = subprocess.Popen(test_cmd, stderr=subprocess.PIPE, 
stdout=subprocess.PIPE, shell=True)
-      time.sleep(0.3) # Delay to allow subprocess to start
-      # Check if processes are running
-      ps_cmd = """ps auxww """
-      ps_process = subprocess.Popen(ps_cmd, stderr=subprocess.PIPE, 
stdout=subprocess.PIPE, shell=True)
-      (out, err) = ps_process.communicate()
-      self.assertTrue(sleep_cmd in out)
-      # Kill test process
-      shell.kill_process_with_children(test_process.pid)
-      test_process.communicate()
-      # Now test process should not be running
-      ps_process = subprocess.Popen(ps_cmd, stderr=subprocess.PIPE, 
stdout=subprocess.PIPE, shell=True)
-      (out, err) = ps_process.communicate()
-      self.assertFalse(sleep_cmd in out)
-      shell.gracefull_kill_delay = gracefull_kill_delay_old
-    else:
-      # Do not run under other systems
-      pass
diff --git a/ambari-common/src/main/python/ambari_commons/shell.py 
b/ambari-common/src/main/python/ambari_commons/shell.py
index 021e495..a4b3263 100644
--- a/ambari-common/src/main/python/ambari_commons/shell.py
+++ b/ambari-common/src/main/python/ambari_commons/shell.py
@@ -231,42 +231,92 @@ class shellRunnerWindows(shellRunner):
     return _dict_to_object({'exitCode': code, 'output': out, 'error': err})
 
 
-#linux specific code
-@OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT)
-def kill_process_with_children(parent_pid):
-  def kill_tree_function(pid, signal):
-    '''
-    Kills process tree starting from a given pid.
-    '''
-    # The command below starts 'ps' linux utility and then parses it's
-    # output using 'awk'. AWK recursively extracts PIDs of all children of
-    # a given PID and then passes list of "kill -<SIGNAL> PID" commands to 'sh'
-    # shell.
-    CMD = """ps xf | awk -v PID=""" + str(pid) + \
-          """ ' $1 == PID { P = $1; next } P && /_/ { P = P " " $1;""" + \
-          """K=P } P && !/_/ { P="" }  END { print "kill -""" \
-          + str(signal) + """ "K }' | sh """
-    process = subprocess.Popen(CMD, stdout=subprocess.PIPE,
-                               stderr=subprocess.PIPE, shell=True)
-    process.communicate()
-
-  _run_kill_function(kill_tree_function, parent_pid)
-
-
-def _run_kill_function(kill_function, pid):
-  try:
-    kill_function(pid, signal.SIGTERM)
-  except Exception, e:
-    logger.warn("Failed to kill PID %d" % (pid))
-    logger.warn("Reported error: " + repr(e))
+def get_all_childrens(base_pid):
+  """
+  Return all child's pids of base_pid process
+
+  :type base_pid int
+  :rtype list[(int, str, str)]
+  """
+  parent_pid_path_pattern = "/proc/{0}/task/{0}/children"
+  comm_path_pattern = "/proc/{0}/comm"
+  cmdline_path_pattern = "/proc/{0}/cmdline"
+
+  def read_childrens(pid):
+    try:
+      with open(parent_pid_path_pattern.format(pid), "r") as f:
+        return [int(item) for item in f.readline().strip().split(" ")]
+    except (IOError, ValueError):
+      return []
+
+  def read_command(pid):
+    try:
+      with open(comm_path_pattern.format(pid), "r") as f:
+        return f.readline().strip()
+    except IOError:
+      return ""
+
+  def read_cmdline(pid):
+    try:
+      with open(cmdline_path_pattern.format(pid), "r") as f:
+        return f.readline().strip()
+    except IOError:
+      return ""
 
-  time.sleep(gracefull_kill_delay)
+  done = []
+  pending = [int(base_pid)]
 
+  while pending:
+    mypid = pending.pop(0)
+    children = read_childrens(mypid)
+
+    done.append((mypid, read_command(mypid), read_cmdline(mypid)))
+    pending.extend(children)
+
+  return done
+
+
+def is_pid_life(pid):
+  """
+  check if process with pid still exists (not counting it real state)
+
+  :type pid int
+  """
+  pid_path = "/proc/{0}"
   try:
-    kill_function(pid, signal.SIGKILL)
-  except Exception, e:
-    logger.error("Failed to send SIGKILL to PID %d. Process exited?" % (pid))
-    logger.error("Reported error: " + repr(e))
+    return os.path.exists(pid_path.format(pid))
+  except Exception:
+    logger.debug("Failed to check pid state")
+    return False
+
+
+# linux specific code
+@OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT)
+def kill_process_with_children(parent_pid):
+  exception_list = ["apt-get", "apt", "yum", "zypper", "zypp"]
+  signals_to_post = [signal.SIGTERM, signal.SIGKILL]
+  all_chield_pids = [item[0] for item in get_all_childrens(parent_pid) if 
item[1].lower() not in exception_list and item[0] != os.getpid()]
+  clean_kill = True
+  last_error = ""
+
+  for sig in signals_to_post:
+    # we need to kill processes from the bottom of the tree
+    pids_to_kill = sorted(all_chield_pids, reverse=True)
+    for pid in pids_to_kill:
+      try:
+        if is_pid_life(pid):
+          os.kill(pid, sig)
+      except Exception as e:
+        clean_kill = False
+        last_error = repr(e)
+
+    if pids_to_kill:
+      time.sleep(gracefull_kill_delay)
+
+  logger.info("Killed process tree starting from main pid {0}: 
{1}".format(parent_pid, ", ".join([str(i) for i in all_chield_pids])))
+  if not clean_kill:
+    logger.warn("Failed to kill some child of PID {0} tree".format(parent_pid))
+    logger.warn("Reported error: " + last_error)
 
 
 def _changeUid():

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to