Repository: ambari Updated Branches: refs/heads/trunk ea1c9f1ac -> 79cffa16d
AMBARI-8916. Show output of Execute commands concurrently (for timeouts, long ops) (aonishuk) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/79cffa16 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/79cffa16 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/79cffa16 Branch: refs/heads/trunk Commit: 79cffa16dcf08a5baddbc861cf0212449deb5956 Parents: ea1c9f1 Author: Andrew Onishuk <[email protected]> Authored: Thu Dec 25 16:18:28 2014 +0200 Committer: Andrew Onishuk <[email protected]> Committed: Thu Dec 25 16:18:28 2014 +0200 ---------------------------------------------------------------------- .../TestExecuteHadoopResource.py | 6 +- .../resource_management/TestExecuteResource.py | 10 +- .../resource_management/TestGroupResource.py | 5 + .../resource_management/TestUserResource.py | 11 ++ .../python/resource_management/core/logger.py | 4 + .../core/providers/system.py | 4 +- .../core/resources/system.py | 17 ++- .../python/resource_management/core/shell.py | 107 ++++++++++++------- .../libraries/resources/execute_hadoop.py | 2 +- .../package/scripts/datanode_upgrade.py | 4 +- .../HDFS/2.1.0.2.0/package/scripts/namenode.py | 25 ++--- .../HDFS/2.1.0.2.0/package/scripts/utils.py | 2 +- .../package/scripts/hive_server_upgrade.py | 4 +- .../0.8.1.2.2/package/scripts/service_check.py | 81 +++++++------- .../YARN/package/scripts/nodemanager_upgrade.py | 4 +- .../KERBEROS/package/scripts/kerberos_common.py | 4 +- .../python/stacks/2.0.6/HDFS/test_datanode.py | 22 ++-- .../python/stacks/2.0.6/HDFS/test_namenode.py | 27 ++--- .../stacks/2.0.6/HIVE/test_hive_server.py | 28 +---- .../stacks/2.0.6/YARN/test_nodemanager.py | 22 ++-- .../src/test/python/stacks/utils/RMFTestCase.py | 17 ++- 21 files changed, 222 insertions(+), 184 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-agent/src/test/python/resource_management/TestExecuteHadoopResource.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/resource_management/TestExecuteHadoopResource.py b/ambari-agent/src/test/python/resource_management/TestExecuteHadoopResource.py index e4fe8df..ae137a5 100644 --- a/ambari-agent/src/test/python/resource_management/TestExecuteHadoopResource.py +++ b/ambari-agent/src/test/python/resource_management/TestExecuteHadoopResource.py @@ -65,7 +65,7 @@ class TestExecuteHadoopResource(TestCase): self.assertEqual(execute_mock.call_count, 1) self.assertEqual(execute_mock.call_args[0][0].command,'hadoop --config conf_dir command') self.assertEqual(execute_mock.call_args[0][0].arguments, - {'logoutput': False, + {'logoutput': None, 'tries': 1, 'user': 'user', 'try_sleep': 0, @@ -122,14 +122,14 @@ class TestExecuteHadoopResource(TestCase): self.assertEqual(execute_mock.call_args_list[1][0][0].command, 'hadoop --config conf_dir command2') self.assertEqual(execute_mock.call_args_list[0][0][0].arguments, - {'logoutput': False, + {'logoutput': None, 'tries': 1, 'user': 'user', 'environment': {}, 'try_sleep': 0, 'path': []}) self.assertEqual(execute_mock.call_args_list[1][0][0].arguments, - {'logoutput': False, + {'logoutput': None, 'tries': 1, 'user': 'user', 'try_sleep': 0, http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-agent/src/test/python/resource_management/TestExecuteResource.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/resource_management/TestExecuteResource.py b/ambari-agent/src/test/python/resource_management/TestExecuteResource.py index 87a637c..a0b375b 100644 --- a/ambari-agent/src/test/python/resource_management/TestExecuteResource.py +++ b/ambari-agent/src/test/python/resource_management/TestExecuteResource.py @@ -38,6 +38,7 @@ class TestExecuteResource(TestCase): def test_attribute_logoutput(self, popen_mock, info_mock): subproc_mock = MagicMock() subproc_mock.returncode = 0 + subproc_mock.stdout.readline = MagicMock(side_effect = ['OK']) subproc_mock.communicate.side_effect = [["1"], ["2"]] popen_mock.return_value = subproc_mock @@ -69,6 +70,7 @@ class TestExecuteResource(TestCase): subproc_mock = MagicMock() subproc_mock.returncode = 0 + subproc_mock.stdout.readline = MagicMock(side_effect = ['OK']) subproc_mock.communicate.side_effect = [["1"]] popen_mock.return_value = subproc_mock @@ -83,6 +85,7 @@ class TestExecuteResource(TestCase): def test_attribute_path(self, popen_mock): subproc_mock = MagicMock() subproc_mock.returncode = 0 + subproc_mock.stdout.readline = MagicMock(side_effect = ['OK']) subproc_mock.communicate.side_effect = [["1"]] popen_mock.return_value = subproc_mock @@ -100,7 +103,7 @@ class TestExecuteResource(TestCase): subproc_mock = MagicMock() subproc_mock.returncode = 0 - subproc_mock.communicate.side_effect = [Fail("Fail"), ["1"]] + subproc_mock.stdout.readline = MagicMock(side_effect = [Fail("Fail"), "OK"]) popen_mock.return_value = subproc_mock with Environment("/") as env: @@ -150,6 +153,7 @@ class TestExecuteResource(TestCase): subproc_mock = MagicMock() subproc_mock.returncode = 0 + subproc_mock.stdout.readline = MagicMock(side_effect = ['OK']) subproc_mock.communicate.side_effect = [["1"]] popen_mock.return_value = subproc_mock @@ -167,6 +171,7 @@ class TestExecuteResource(TestCase): subproc_mock = MagicMock() subproc_mock.returncode = 0 + subproc_mock.stdout.readline = MagicMock(side_effect = ['OK']) subproc_mock.communicate.side_effect = [["1"]] popen_mock.return_value = subproc_mock @@ -188,6 +193,7 @@ class TestExecuteResource(TestCase): subproc_mock = MagicMock() subproc_mock.returncode = 0 + subproc_mock.stdout.readline = MagicMock(side_effect = ['OK']) subproc_mock.communicate.side_effect = [["1"]] popen_mock.return_value = subproc_mock @@ -209,6 +215,7 @@ class TestExecuteResource(TestCase): subproc_mock = MagicMock() subproc_mock.returncode = 0 + subproc_mock.stdout.readline = MagicMock(side_effect = ['OK']) popen_mock.return_value = subproc_mock with Environment("/") as env: @@ -238,6 +245,7 @@ class TestExecuteResource(TestCase): subproc_mock = MagicMock() subproc_mock.returncode = 0 + subproc_mock.stdout.readline = MagicMock(side_effect = ['OK']) popen_mock.return_value = subproc_mock with Environment("/") as env: http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-agent/src/test/python/resource_management/TestGroupResource.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/resource_management/TestGroupResource.py b/ambari-agent/src/test/python/resource_management/TestGroupResource.py index 29decbb..7ef487a 100644 --- a/ambari-agent/src/test/python/resource_management/TestGroupResource.py +++ b/ambari-agent/src/test/python/resource_management/TestGroupResource.py @@ -36,6 +36,7 @@ class TestGroupResource(TestCase): def test_action_create_nonexistent(self, popen_mock, getgrnam_mock): subproc_mock = MagicMock() subproc_mock.returncode = 0 + subproc_mock.stdout.readline = MagicMock(side_effect = ['OK']) popen_mock.return_value = subproc_mock getgrnam_mock.side_effect = KeyError() @@ -56,6 +57,7 @@ class TestGroupResource(TestCase): def test_action_create_existent(self, popen_mock, getgrnam_mock): subproc_mock = MagicMock() subproc_mock.returncode = 0 + subproc_mock.stdout.readline = MagicMock(side_effect = ['OK']) popen_mock.return_value = subproc_mock getgrnam_mock.return_value = "mapred" @@ -77,6 +79,7 @@ class TestGroupResource(TestCase): def test_action_create_fail(self, popen_mock, getgrnam_mock): subproc_mock = MagicMock() subproc_mock.returncode = 1 + subproc_mock.stdout.readline = MagicMock(side_effect = ['OK']) popen_mock.return_value = subproc_mock getgrnam_mock.return_value = "mapred" @@ -102,6 +105,7 @@ class TestGroupResource(TestCase): subproc_mock = MagicMock() subproc_mock.returncode = 0 + subproc_mock.stdout.readline = MagicMock(side_effect = ['OK']) popen_mock.return_value = subproc_mock getgrnam_mock.return_value = "mapred" @@ -122,6 +126,7 @@ class TestGroupResource(TestCase): subproc_mock = MagicMock() subproc_mock.returncode = 1 + subproc_mock.stdout.readline = MagicMock(side_effect = ['OK']) popen_mock.return_value = subproc_mock getgrnam_mock.return_value = "mapred" http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-agent/src/test/python/resource_management/TestUserResource.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/resource_management/TestUserResource.py b/ambari-agent/src/test/python/resource_management/TestUserResource.py index 458053b..f66b738 100644 --- a/ambari-agent/src/test/python/resource_management/TestUserResource.py +++ b/ambari-agent/src/test/python/resource_management/TestUserResource.py @@ -35,6 +35,7 @@ class TestUserResource(TestCase): def test_action_create_nonexistent(self, getpwnam_mock, popen_mock): subproc_mock = MagicMock() subproc_mock.returncode = 0 + subproc_mock.stdout.readline = MagicMock(side_effect = ['OK']) popen_mock.return_value = subproc_mock getpwnam_mock.return_value = None with Environment('/') as env: @@ -48,6 +49,7 @@ class TestUserResource(TestCase): def test_action_create_existent(self, getpwnam_mock, popen_mock): subproc_mock = MagicMock() subproc_mock.returncode = 0 + subproc_mock.stdout.readline = MagicMock(side_effect = ['OK']) popen_mock.return_value = subproc_mock getpwnam_mock.return_value = 1 @@ -62,6 +64,7 @@ class TestUserResource(TestCase): def test_action_delete(self, getpwnam_mock, popen_mock): subproc_mock = MagicMock() subproc_mock.returncode = 0 + subproc_mock.stdout.readline = MagicMock(side_effect = ['OK']) popen_mock.return_value = subproc_mock getpwnam_mock.return_value = 1 @@ -76,6 +79,7 @@ class TestUserResource(TestCase): def test_attribute_comment(self, getpwnam_mock, popen_mock): subproc_mock = MagicMock() subproc_mock.returncode = 0 + subproc_mock.stdout.readline = MagicMock(side_effect = ['OK']) popen_mock.return_value = subproc_mock getpwnam_mock.return_value = 1 @@ -91,6 +95,7 @@ class TestUserResource(TestCase): def test_attribute_home(self, getpwnam_mock, popen_mock): subproc_mock = MagicMock() subproc_mock.returncode = 0 + subproc_mock.stdout.readline = MagicMock(side_effect = ['OK']) popen_mock.return_value = subproc_mock getpwnam_mock.return_value = 1 @@ -106,6 +111,7 @@ class TestUserResource(TestCase): def test_attribute_password(self, getpwnam_mock, popen_mock): subproc_mock = MagicMock() subproc_mock.returncode = 0 + subproc_mock.stdout.readline = MagicMock(side_effect = ['OK']) popen_mock.return_value = subproc_mock getpwnam_mock.return_value = 1 @@ -121,6 +127,7 @@ class TestUserResource(TestCase): def test_attribute_shell(self, getpwnam_mock, popen_mock): subproc_mock = MagicMock() subproc_mock.returncode = 0 + subproc_mock.stdout.readline = MagicMock(side_effect = ['OK']) popen_mock.return_value = subproc_mock getpwnam_mock.return_value = 1 @@ -135,6 +142,7 @@ class TestUserResource(TestCase): def test_attribute_uid(self, getpwnam_mock, popen_mock): subproc_mock = MagicMock() subproc_mock.returncode = 0 + subproc_mock.stdout.readline = MagicMock(side_effect = ['OK']) popen_mock.return_value = subproc_mock getpwnam_mock.return_value = 1 @@ -149,6 +157,7 @@ class TestUserResource(TestCase): def test_attribute_gid(self, getpwnam_mock, popen_mock): subproc_mock = MagicMock() subproc_mock.returncode = 0 + subproc_mock.stdout.readline = MagicMock(side_effect = ['OK']) popen_mock.return_value = subproc_mock getpwnam_mock.return_value = 1 @@ -163,6 +172,7 @@ class TestUserResource(TestCase): def test_attribute_groups(self, getpwnam_mock, popen_mock): subproc_mock = MagicMock() subproc_mock.returncode = 0 + subproc_mock.stdout.readline = MagicMock(side_effect = ['OK']) popen_mock.return_value = subproc_mock getpwnam_mock.return_value = 1 @@ -178,6 +188,7 @@ class TestUserResource(TestCase): def test_missing_shell_argument(self, getpwnam_mock, popen_mock): subproc_mock = MagicMock() subproc_mock.returncode = 0 + subproc_mock.stdout.readline = MagicMock(side_effect = ['OK']) popen_mock.return_value = subproc_mock getpwnam_mock.return_value = None with Environment('/') as env: http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-common/src/main/python/resource_management/core/logger.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/resource_management/core/logger.py b/ambari-common/src/main/python/resource_management/core/logger.py index 6ff40d6..69a55fb 100644 --- a/ambari-common/src/main/python/resource_management/core/logger.py +++ b/ambari-common/src/main/python/resource_management/core/logger.py @@ -105,8 +105,12 @@ class Logger: val = oct(y) except: val = repr(y) + # for functions show only function name + elif hasattr(y, '__call__') and hasattr(y, '__name__'): + val = y.__name__ else: val = repr(y) + arguments_str += "'{0}': {1}, ".format(x, val) http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-common/src/main/python/resource_management/core/providers/system.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/resource_management/core/providers/system.py b/ambari-common/src/main/python/resource_management/core/providers/system.py index b3949df..fc06a78 100644 --- a/ambari-common/src/main/python/resource_management/core/providers/system.py +++ b/ambari-common/src/main/python/resource_management/core/providers/system.py @@ -258,8 +258,8 @@ class ExecuteProvider(Provider): wait_for_finish=self.resource.wait_for_finish, timeout=self.resource.timeout, path=self.resource.path, - output_file=self.resource.output_file, - sudo=self.resource.sudo) + sudo=self.resource.sudo, + on_new_line=self.resource.on_new_line) break except Fail as ex: if i == self.resource.tries-1: # last try http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-common/src/main/python/resource_management/core/resources/system.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/resource_management/core/resources/system.py b/ambari-common/src/main/python/resource_management/core/resources/system.py index b0729d1..b7958d9 100644 --- a/ambari-common/src/main/python/resource_management/core/resources/system.py +++ b/ambari-common/src/main/python/resource_management/core/resources/system.py @@ -92,7 +92,21 @@ class Execute(Resource): try_sleep = ResourceArgument(default=0) # seconds path = ForcedListArgument(default=[]) actions = Resource.actions + ["run"] - logoutput = BooleanArgument(default=False) + # TODO: handle how this is logged / tested? + """ + A one-argument function, which will be executed, + once new line comes into command output. + + The only parameter of this function is a new line which comes to output. + """ + on_new_line = ResourceArgument() + """ + True - log it in INFO mode + False - never log it + None (default) - log it in DEBUG mode + """ + logoutput = ResourceArgument(default=None) + """ if on_timeout is not set leads to failing after x seconds, otherwise calls on_timeout @@ -110,7 +124,6 @@ class Execute(Resource): - try_sleep """ wait_for_finish = BooleanArgument(default=True) - output_file = ResourceArgument() """ For calling more advanced commands use as_sudo(command) option. Example: http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-common/src/main/python/resource_management/core/shell.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/resource_management/core/shell.py b/ambari-common/src/main/python/resource_management/core/shell.py index df92e45..886f0f1 100644 --- a/ambari-common/src/main/python/resource_management/core/shell.py +++ b/ambari-common/src/main/python/resource_management/core/shell.py @@ -21,11 +21,14 @@ Ambari Agent """ import os -__all__ = ["checked_call", "call", "quote_bash_args", "as_user", "as_sudo"] +__all__ = ["non_blocking_call", "checked_call", "call", "quote_bash_args", "as_user", "as_sudo"] +import sys +import logging import string import subprocess import threading +import traceback from multiprocessing import Queue from exceptions import Fail from exceptions import ExecuteTimeoutException @@ -40,23 +43,35 @@ PLACEHOLDERS_TO_STR = { } def checked_call(command, verbose=False, logoutput=False, - cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, path=None, output_file=None, sudo=False): + cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, path=None, sudo=False, on_new_line=None): """ - Execute the process and throw an exception on failure. - @return: return_code, stdout + Execute the shell command and throw an exception on failure. + @throws Fail + @return: return_code, output """ - return _call(command, verbose, logoutput, True, cwd, env, preexec_fn, user, wait_for_finish, timeout, path, output_file, sudo) + return _call(command, verbose, logoutput, True, cwd, env, preexec_fn, user, wait_for_finish, timeout, path, sudo, on_new_line) def call(command, verbose=False, logoutput=False, - cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, path=None, output_file=None, sudo=False): + cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, path=None, sudo=False, on_new_line=None): """ - Execute the process despite failures. - @return: return_code, stdout + Execute the shell command despite failures. + @return: return_code, output """ - return _call(command, verbose, logoutput, False, cwd, env, preexec_fn, user, wait_for_finish, timeout, path, output_file, sudo) + return _call(command, verbose, logoutput, False, cwd, env, preexec_fn, user, wait_for_finish, timeout, path, sudo, on_new_line) + +def non_blocking_call(command, verbose=False, + cwd=None, env=None, preexec_fn=None, user=None, timeout=None, path=None, sudo=False): + """ + Execute the shell command and don't wait until it's completion + + @return: process object -- Popen instance + (use proc.stdout.readline to read output in cycle, don't foget to proc.stdout.close(), + to get return code use proc.wait() and after that proc.returncode) + """ + return _call(command, verbose, False, True, cwd, env, preexec_fn, user, False, timeout, path, sudo, None) def _call(command, verbose=False, logoutput=False, throw_on_failure=True, - cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, path=None, output_file=None, sudo=False): + cwd=None, env=None, preexec_fn=None, user=None, wait_for_finish=True, timeout=None, path=None, sudo=False, on_new_line=None): """ Execute shell command @@ -64,14 +79,12 @@ def _call(command, verbose=False, logoutput=False, throw_on_failure=True, or string of the command to execute @param logoutput: boolean, whether command output should be logged of not @param throw_on_failure: if true, when return code is not zero exception is thrown - - @return: return_code, stdout """ command_alias = string_cmd_from_args_list(command) if isinstance(command, (list, tuple)) else command # Append current PATH to env['PATH'] - env = add_current_path_to_env(env) + env = _add_current_path_to_env(env) # Append path to env['PATH'] if path: path = os.pathsep.join(path) if isinstance(path, (list, tuple)) else path @@ -88,7 +101,7 @@ def _call(command, verbose=False, logoutput=False, throw_on_failure=True, command = string_cmd_from_args_list(command) # replace placeholder from as_sudo / as_user if present - env_str = get_environment_str(env) + env_str = _get_environment_str(env) for placeholder, replacement in PLACEHOLDERS_TO_STR.iteritems(): command = command.replace(placeholder, replacement.format(env_str=env_str)) @@ -100,16 +113,37 @@ def _call(command, verbose=False, logoutput=False, throw_on_failure=True, proc = subprocess.Popen(subprocess_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=cwd, env=env, shell=False, preexec_fn=preexec_fn) - - if not wait_for_finish: - return None, None if timeout: q = Queue() - t = threading.Timer( timeout, on_timeout, [proc, q] ) + t = threading.Timer( timeout, _on_timeout, [proc, q] ) t.start() - out = proc.communicate()[0].strip('\n') + if not wait_for_finish: + return proc + + # in case logoutput==False, never log. + logoutput = logoutput==True and Logger.logger.isEnabledFor(logging.INFO) or logoutput==None and Logger.logger.isEnabledFor(logging.DEBUG) + out = "" + + try: + for line in iter(proc.stdout.readline, b''): + out += line + + try: + if on_new_line: + on_new_line(line) + except Exception, err: + err_msg = "Caused by on_new_line function failed with exception for input argument '{0}':\n{1}".format(line, traceback.format_exc()) + raise Fail(err_msg) + + if logoutput: + _print(line) + finally: + proc.stdout.close() + + proc.wait() + out = out.strip('\n') if timeout: if q.empty(): @@ -120,9 +154,6 @@ def _call(command, verbose=False, logoutput=False, throw_on_failure=True, code = proc.returncode - if logoutput and out: - Logger.info(out) - if throw_on_failure and code: err_msg = Logger.filter_text(("Execution of '%s' returned %d. %s") % (command_alias, code, out)) raise Fail(err_msg) @@ -147,17 +178,26 @@ def as_sudo(command, env=None): err_msg = Logger.filter_text(("String command '%s' cannot be run as sudo. Please supply the command as a tuple of arguments") % (command)) raise Fail(err_msg) - env = get_environment_str(add_current_path_to_env(env)) if env else ENV_PLACEHOLDER + env = _get_environment_str(_add_current_path_to_env(env)) if env else ENV_PLACEHOLDER return "/usr/bin/sudo {0} -H -E {1}".format(env, command) def as_user(command, user, env=None): if isinstance(command, (list, tuple)): command = string_cmd_from_args_list(command) - export_env = "export {0} ; ".format(get_environment_str(add_current_path_to_env(env))) if env else EXPORT_PLACEHOLDER + export_env = "export {0} ; ".format(_get_environment_str(_add_current_path_to_env(env))) if env else EXPORT_PLACEHOLDER return "/usr/bin/sudo su {0} -l -s /bin/bash -c {1}".format(user, quote_bash_args(export_env + command)) -def add_current_path_to_env(env): +def quote_bash_args(command): + if not command: + return "''" + valid = set(string.ascii_letters + string.digits + '@%_-+=:,./') + for char in command: + if char not in valid: + return "'" + command.replace("'", "'\"'\"'") + "'" + return command + +def _add_current_path_to_env(env): result = {} if not env else env if not 'PATH' in result: @@ -169,25 +209,20 @@ def add_current_path_to_env(env): return result -def get_environment_str(env): +def _get_environment_str(env): return reduce(lambda str,x: '{0} {1}={2}'.format(str,x,quote_bash_args(env[x])), env, '') def string_cmd_from_args_list(command): return ' '.join(quote_bash_args(x) for x in command) -def on_timeout(proc, q): +def _on_timeout(proc, q): q.put(True) if proc.poll() == None: try: proc.terminate() except: pass - -def quote_bash_args(command): - if not command: - return "''" - valid = set(string.ascii_letters + string.digits + '@%_-+=:,./') - for char in command: - if char not in valid: - return "'" + command.replace("'", "'\"'\"'") + "'" - return command \ No newline at end of file + +def _print(line): + sys.stdout.write(line) + sys.stdout.flush() \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-common/src/main/python/resource_management/libraries/resources/execute_hadoop.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/resource_management/libraries/resources/execute_hadoop.py b/ambari-common/src/main/python/resource_management/libraries/resources/execute_hadoop.py index 8b61331..b4b0b52 100644 --- a/ambari-common/src/main/python/resource_management/libraries/resources/execute_hadoop.py +++ b/ambari-common/src/main/python/resource_management/libraries/resources/execute_hadoop.py @@ -30,7 +30,7 @@ class ExecuteHadoop(Resource): tries = ResourceArgument(default=1) try_sleep = ResourceArgument(default=0) # seconds user = ResourceArgument() - logoutput = BooleanArgument(default=False) + logoutput = ResourceArgument() principal = ResourceArgument(default=lambda obj: obj.user) bin_dir = ResourceArgument(default=[]) # appended to $PATH environment = ResourceArgument(default={}) http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode_upgrade.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode_upgrade.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode_upgrade.py index 88af1f9..529ca44 100644 --- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode_upgrade.py +++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode_upgrade.py @@ -20,7 +20,7 @@ limitations under the License. from resource_management.core.logger import Logger from resource_management.core.exceptions import Fail from resource_management.core.resources.system import Execute -from resource_management.core.shell import call +from resource_management.core import shell from resource_management.libraries.functions import format from resource_management.libraries.functions.decorator import retry @@ -99,7 +99,7 @@ def _check_datanode_startup(): try: # 'su - hdfs -c "hdfs dfsadmin -report -live"' command = 'hdfs dfsadmin -report -live' - return_code, hdfs_output = call(command, user=params.hdfs_user) + return_code, hdfs_output = shell.call(command, user=params.hdfs_user) except: raise Fail('Unable to determine if the DataNode has started after upgrade.') http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py index 3eb9cc2..e8dfe16 100644 --- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py +++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py @@ -187,16 +187,9 @@ class NameNode(Script): _print("Executing command %s\n" % command) parser = hdfs_rebalance.HdfsParser() - proc = subprocess.Popen( - command, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - shell=True, - close_fds=True, - cwd=basedir - ) - for line in iter(proc.stdout.readline, ''): - _print('[balancer] %s %s' % (str(datetime.now()), line )) + + def handle_new_line(line): + _print('[balancer] %s' % (line)) pl = parser.parseLine(line) if pl: res = pl.toJson() @@ -204,14 +197,14 @@ class NameNode(Script): self.put_structured_out(res) elif parser.state == 'PROCESS_FINISED' : - _print('[balancer] %s %s' % (str(datetime.now()), 'Process is finished' )) + _print('[balancer] %s' % ('Process is finished' )) self.put_structured_out({'completePercent' : 1}) - break + return - proc.stdout.close() - proc.wait() - if proc.returncode != None and proc.returncode != 0: - raise Fail('Hdfs rebalance process exited with error. See the log output') + Execute(command, + on_new_line = handle_new_line, + logoutput = False, + ) def _print(line): sys.stdout.write(line) http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py index 6f421b6..b9bd273 100644 --- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py +++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py @@ -98,7 +98,7 @@ def kill_zkfc(zkfc_user): if code == 0: Logger.debug("ZKFC is running and will be killed to initiate namenode failover.") kill_command = format("{check_process} && kill -9 `cat {zkfc_pid_file}` > /dev/null 2>&1") - checked_call(kill_command, verbose=True) + Execute(kill_command) def get_service_pid_file(name, user): http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_server_upgrade.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_server_upgrade.py b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_server_upgrade.py index 653d4bd..37fe7f1 100644 --- a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_server_upgrade.py +++ b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_server_upgrade.py @@ -21,7 +21,7 @@ import re from resource_management.core.logger import Logger from resource_management.core.exceptions import Fail from resource_management.core.resources.system import Execute -from resource_management.core.shell import call +from resource_management.core import shell from resource_management.libraries.functions import format @@ -65,7 +65,7 @@ def _get_current_hiveserver_version(): try: command = 'hdp-select status hive-server2' - return_code, hdp_output = call(command, user=params.hive_user) + return_code, hdp_output = shell.call(command, user=params.hive_user) except Exception, e: Logger.error(str(e)) raise Fail('Unable to execute hdp-select command to retrieve the hiveserver2 version.') http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/service_check.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/service_check.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/service_check.py index c2b4bc1..ac7014d 100644 --- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/service_check.py +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/service_check.py @@ -17,49 +17,54 @@ See the License for the specific language governing permissions and limitations under the License. """ -from __future__ import print_function from resource_management import * -import sys,subprocess,os class ServiceCheck(Script): - def service_check(self, env): - import params - env.set_params(params) - kafka_config=self.read_kafka_config(params.conf_dir) - self.set_env(params.conf_dir) - create_topic_cmd_created_output = "Created topic \"ambari_kafka_service_check\"." - create_topic_cmd_exists_output = "Topic \"ambari_kafka_service_check\" already exists." - print("Running kafka create topic command", file=sys.stdout) - create_topic_cmd = [params.kafka_home+'/bin/kafka-topics.sh', '--zookeeper '+kafka_config['zookeeper.connect'], - '--create --topic ambari_kafka_service_check', '--partitions 1 --replication-factor 1'] - print(" ".join(create_topic_cmd), file=sys.stdout) - create_topic_process = subprocess.Popen(create_topic_cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE) - out, err = create_topic_process.communicate() - if out.find(create_topic_cmd_created_output) != -1: - print(out, file=sys.stdout) - sys.exit(0) - elif out.find(create_topic_cmd_exists_output) != -1: - print("Topic ambari_kafka_service_check exists", file=sys.stdout) - sys.exit(0) - else: - print(out, file=sys.stderr) - sys.exit(1) + def service_check(self, env): + import params + env.set_params(params) + + kafka_config = self.read_kafka_config() + environment = self.get_env() + + create_topic_cmd_created_output = "Created topic \"ambari_kafka_service_check\"." + create_topic_cmd_exists_output = "Topic \"ambari_kafka_service_check\" already exists." + + print "Running kafka create topic command" + create_topic_cmd = (params.kafka_home+'/bin/kafka-topics.sh', '--zookeeper '+kafka_config['zookeeper.connect'], + '--create --topic ambari_kafka_service_check', '--partitions 1 --replication-factor 1') + + code, out = shell.checked_call(create_topic_cmd, + verbose=True, env=environment) - def read_kafka_config(self,kafka_conf_dir): - conf_file = open(kafka_conf_dir+"/server.properties","r") - kafka_config = {} - for line in conf_file: - key,value = line.split("=") - kafka_config[key] = value.replace("\n","") - return kafka_config + if out.find(create_topic_cmd_created_output) != -1: + print out + elif out.find(create_topic_cmd_exists_output) != -1: + print "Topic ambari_kafka_service_check exists" + else: + raise Fail(out) - def set_env(self, kafka_conf_dir): - command = ['bash', '-c', 'source '+kafka_conf_dir+'/kafka-env.sh && env'] - proc = subprocess.Popen(command, stdout = subprocess.PIPE) - for line in proc.stdout: - (key, _, value) = line.partition("=") - os.environ[key] = value.replace("\n","") - proc.communicate() + def read_kafka_config(self): + import params + + kafka_config = {} + with open(params.conf_dir+"/server.properties","r") as conf_file: + for line in conf_file: + key,value = line.split("=") + kafka_config[key] = value.replace("\n","") + + return kafka_config + + def get_env(self): + import params + code, out = shell.checked_call(format('source {conf_dir}/kafka-env.sh && env')) + + environment = {} + for line in out.split("\n"): + (key, _, value) = line.partition("=") + environment[key] = value.replace("\n","") + + return environment if __name__ == "__main__": ServiceCheck().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/YARN/package/scripts/nodemanager_upgrade.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/YARN/package/scripts/nodemanager_upgrade.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/YARN/package/scripts/nodemanager_upgrade.py index e82c320..54e0fae 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/YARN/package/scripts/nodemanager_upgrade.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/YARN/package/scripts/nodemanager_upgrade.py @@ -22,7 +22,7 @@ import subprocess from resource_management.core.logger import Logger from resource_management.core.exceptions import Fail from resource_management.core.resources.system import Execute -from resource_management.core.shell import call +from resource_management.core import shell from resource_management.libraries.functions.decorator import retry @@ -56,7 +56,7 @@ def _check_nodemanager_startup(): try: # 'su - yarn -c "yarn node -status c6401.ambari.apache.org:45454"' - return_code, yarn_output = call(command, user=params.hdfs_user) + return_code, yarn_output = shell.call(command, user=params.hdfs_user) except: raise Fail('Unable to determine if the NodeManager has started after upgrade.') http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/main/resources/stacks/HDP/2.2/services/KERBEROS/package/scripts/kerberos_common.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/services/KERBEROS/package/scripts/kerberos_common.py b/ambari-server/src/main/resources/stacks/HDP/2.2/services/KERBEROS/package/scripts/kerberos_common.py index d16a749..42e195c 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.2/services/KERBEROS/package/scripts/kerberos_common.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.2/services/KERBEROS/package/scripts/kerberos_common.py @@ -312,7 +312,7 @@ class KerberosScript(Script): # If a test keytab file is available, simply use it if (keytab_file is not None) and (os.path.isfile(keytab_file)): command = 'kinit -k -t %s %s' % (keytab_file, principal) - shell.checked_call(command) + Execute(command) return shell.checked_call('kdestroy') # If base64-encoded test keytab data is available; then decode it, write it to a temporary file @@ -324,7 +324,7 @@ class KerberosScript(Script): try: command = 'kinit -k -t %s %s' % (test_keytab_file, principal) - shell.checked_call(command) + Execute(command) return shell.checked_call('kdestroy') except: raise http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_datanode.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_datanode.py b/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_datanode.py index 1c032ff..2fc8549 100644 --- a/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_datanode.py +++ b/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_datanode.py @@ -20,6 +20,7 @@ limitations under the License. from stacks.utils.RMFTestCase import * import json from mock.mock import MagicMock, patch +from resource_management.core import shell from resource_management.core.exceptions import Fail class TestDatanode(RMFTestCase): @@ -441,7 +442,7 @@ class TestDatanode(RMFTestCase): @patch('time.sleep') - @patch("subprocess.Popen") + @patch.object(shell, "call") def test_post_rolling_restart(self, process_mock, time_mock): process_output = """ Live datanodes (2): @@ -464,10 +465,7 @@ class TestDatanode(RMFTestCase): Last contact: Fri Dec 12 20:47:21 UTC 2014 """ - process = MagicMock() - process.communicate.return_value = [process_output] - process.returncode = 0 - process_mock.return_value = process + process_mock.return_value = (0, process_output) self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/datanode.py", classname = "DataNode", @@ -482,12 +480,9 @@ class TestDatanode(RMFTestCase): @patch('time.sleep') - @patch("subprocess.Popen") + @patch.object(shell, "call") def test_post_rolling_restart_datanode_not_ready(self, process_mock, time_mock): - process = MagicMock() - process.communicate.return_value = ['There are no DataNodes here!'] - process.returncode = 0 - process_mock.return_value = process + process_mock.return_value = (0, 'There are no DataNodes here!') try: self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/datanode.py", @@ -504,12 +499,9 @@ class TestDatanode(RMFTestCase): @patch('time.sleep') - @patch("subprocess.Popen") + @patch.object(shell, "call") def test_post_rolling_restart_bad_returncode(self, process_mock, time_mock): - process = MagicMock() - process.communicate.return_value = ['some'] - process.returncode = 999 - process_mock.return_value = process + process_mock.return_value = (0, 'some') try: self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/datanode.py", http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_namenode.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_namenode.py b/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_namenode.py index 78a2f72..3981e33 100644 --- a/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_namenode.py +++ b/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_namenode.py @@ -88,6 +88,7 @@ class TestNamenode(RMFTestCase): environment = {'HADOOP_LIBEXEC_DIR': '/usr/lib/hadoop/libexec'}, not_if = 'ls /var/run/hadoop/hdfs/hadoop-hdfs-namenode.pid >/dev/null 2>&1 && ps -p `cat /var/run/hadoop/hdfs/hadoop-hdfs-namenode.pid` >/dev/null 2>&1', ) + self.printResources() self.assertResourceCalled('Execute', 'hdfs --config /etc/hadoop/conf dfsadmin -safemode leave', path = ['/usr/bin'], user = 'hdfs', @@ -776,26 +777,6 @@ class TestNamenode(RMFTestCase): @patch("resource_management.libraries.script.Script.put_structured_out") def test_rebalance_hdfs(self, pso): - Popen_Mock.return_value = 1 - with patch("subprocess.Popen", new_callable=Popen_Mock): - ll = subprocess.Popen() - self.assertTrue(isinstance(ll.stdout.readline(),str)) - try: - self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/namenode.py", - classname = "NameNode", - command = "rebalancehdfs", - config_file = "rebalancehdfs_default.json", - hdp_stack_version = self.STACK_VERSION, - target = RMFTestCase.TARGET_COMMON_SERVICES - ) - self.fail("Exception was not thrown") - except resource_management.core.exceptions.Fail: - pass ##expected - - pso.reset_mock() - Popen_Mock.return_value = 0 - ll = subprocess.Popen() - self.assertTrue(isinstance(ll.stdout.readline(),str)) self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/namenode.py", classname = "NameNode", command = "rebalancehdfs", @@ -803,7 +784,11 @@ class TestNamenode(RMFTestCase): hdp_stack_version = self.STACK_VERSION, target = RMFTestCase.TARGET_COMMON_SERVICES ) - self.assertEqual(pso.call_count, 2, "Output was not parsed properly") + self.assertResourceCalled('Execute', "/usr/bin/sudo su hdfs -l -s /bin/bash -c 'export PATH=/bin:/usr/bin ; hdfs --config /etc/hadoop/conf balancer -threshold -1'", + logoutput = False, + on_new_line = FunctionMock('handle_new_line'), + ) + self.assertNoMoreResources() class Popen_Mock: return_value = 1 http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/test/python/stacks/2.0.6/HIVE/test_hive_server.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/python/stacks/2.0.6/HIVE/test_hive_server.py b/ambari-server/src/test/python/stacks/2.0.6/HIVE/test_hive_server.py index 8046313..0adb266 100644 --- a/ambari-server/src/test/python/stacks/2.0.6/HIVE/test_hive_server.py +++ b/ambari-server/src/test/python/stacks/2.0.6/HIVE/test_hive_server.py @@ -536,25 +536,15 @@ class TestHiveServer(RMFTestCase): @patch("hive_server.HiveServer.pre_rolling_restart") @patch("hive_server.HiveServer.start") - @patch("subprocess.Popen") - def test_stop_during_upgrade(self, process_mock, hive_server_start_mock, + @patch.object(shell, "call", new=MagicMock(return_value=(0,"hive-server2 - 2.2.0.0-2041"))) + def test_stop_during_upgrade(self, hive_server_start_mock, hive_server_pre_rolling_mock): - - process_output = 'hive-server2 - 2.2.0.0-2041' - - process = MagicMock() - process.communicate.return_value = [process_output] - process.returncode = 0 - process_mock.return_value = process - + self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/hive_server.py", classname = "HiveServer", command = "restart", config_file = "hive-upgrade.json", hdp_stack_version = self.UPGRADE_STACK_VERSION, target = RMFTestCase.TARGET_COMMON_SERVICES ) - self.assertTrue(process_mock.called) - self.assertEqual(process_mock.call_count,2) - self.assertResourceCalled('Execute', 'hive --service hiveserver2 --deregister 2.2.0.0-2041', path=['/bin:/usr/hdp/current/hive-server2/bin:/usr/hdp/current/hadoop-client/bin'], tries=1, user='hive') @@ -564,17 +554,9 @@ class TestHiveServer(RMFTestCase): @patch("hive_server.HiveServer.pre_rolling_restart") @patch("hive_server.HiveServer.start") - @patch("subprocess.Popen") - def test_stop_during_upgrade_bad_hive_version(self, process_mock, hive_server_start_mock, + @patch.object(shell, "call", new=MagicMock(return_value=(0,"BAD VERSION"))) + def test_stop_during_upgrade_bad_hive_version(self, hive_server_start_mock, hive_server_pre_rolling_mock): - - process_output = 'BAD VERSION' - - process = MagicMock() - process.communicate.return_value = [process_output] - process.returncode = 0 - process_mock.return_value = process - try: self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/hive_server.py", classname = "HiveServer", command = "restart", config_file = "hive-upgrade.json", http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/test/python/stacks/2.0.6/YARN/test_nodemanager.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/python/stacks/2.0.6/YARN/test_nodemanager.py b/ambari-server/src/test/python/stacks/2.0.6/YARN/test_nodemanager.py index d4229dc..bc9b831 100644 --- a/ambari-server/src/test/python/stacks/2.0.6/YARN/test_nodemanager.py +++ b/ambari-server/src/test/python/stacks/2.0.6/YARN/test_nodemanager.py @@ -20,6 +20,7 @@ limitations under the License. from mock.mock import MagicMock, call, patch from stacks.utils.RMFTestCase import * from resource_management.core.exceptions import Fail +from resource_management.core import shell import os origin_exists = os.path.exists @@ -559,16 +560,13 @@ class TestNodeManager(RMFTestCase): ) @patch('time.sleep') - @patch("subprocess.Popen") + @patch.object(shell, "call") def test_post_rolling_restart(self, process_mock, time_mock): process_output = """ c6401.ambari.apache.org:45454 RUNNING c6401.ambari.apache.org:8042 0 """ - process = MagicMock() - process.communicate.return_value = [process_output] - process.returncode = 0 - process_mock.return_value = process + process_mock.return_value = (0, process_output) self.executeScript("2.0.6/services/YARN/package/scripts/nodemanager.py", classname="Nodemanager", command = "post_rolling_restart", config_file="default.json") @@ -578,16 +576,13 @@ class TestNodeManager(RMFTestCase): @patch('time.sleep') - @patch("subprocess.Popen") + @patch.object(shell, "call") def test_post_rolling_restart_nodemanager_not_ready(self, process_mock, time_mock): process_output = """ c9999.ambari.apache.org:45454 RUNNING c9999.ambari.apache.org:8042 0 """ - process = MagicMock() - process.communicate.return_value = [process_output] - process.returncode = 0 - process_mock.return_value = process + process_mock.return_value = (0, process_output) try: self.executeScript("2.0.6/services/YARN/package/scripts/nodemanager.py", @@ -599,16 +594,13 @@ class TestNodeManager(RMFTestCase): @patch('time.sleep') - @patch("subprocess.Popen") + @patch.object(shell, "call") def test_post_rolling_restart_nodemanager_not_ready(self, process_mock, time_mock): process_output = """ c6401.ambari.apache.org:45454 RUNNING c6401.ambari.apache.org:8042 0 """ - process = MagicMock() - process.communicate.return_value = [process_output] - process.returncode = 999 - process_mock.return_value = process + process_mock.return_value = (999, process_output) try: self.executeScript("2.0.6/services/YARN/package/scripts/nodemanager.py", http://git-wip-us.apache.org/repos/asf/ambari/blob/79cffa16/ambari-server/src/test/python/stacks/utils/RMFTestCase.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/python/stacks/utils/RMFTestCase.py b/ambari-server/src/test/python/stacks/utils/RMFTestCase.py index 63bcdb8..b8e819d 100644 --- a/ambari-server/src/test/python/stacks/utils/RMFTestCase.py +++ b/ambari-server/src/test/python/stacks/utils/RMFTestCase.py @@ -17,7 +17,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ''' -__all__ = ["RMFTestCase", "Template", "StaticFile", "InlineTemplate", "UnknownConfigurationMock"] +__all__ = ["RMFTestCase", "Template", "StaticFile", "InlineTemplate", "UnknownConfigurationMock", "FunctionMock"] from unittest import TestCase import json @@ -172,6 +172,8 @@ class RMFTestCase(TestCase): val = oct(v) elif isinstance( v, UnknownConfiguration): val = "UnknownConfigurationMock()" + elif hasattr(v, '__call__') and hasattr(v, '__name__'): + val = "FunctionMock('{0}')".format(v.__name__) else: val = self._ppformat(v) # If value is multiline, format it @@ -196,7 +198,7 @@ class RMFTestCase(TestCase): print(self.reindent("self.assertNoMoreResources()", intendation)) def assertResourceCalled(self, resource_type, name, **kwargs): - with patch.object(UnknownConfiguration, '__getattr__', return_value=lambda: "UnknownConfiguration()"): + with patch.object(UnknownConfiguration, '__getattr__', return_value=lambda: "UnknownConfiguration()"): self.assertNotEqual(len(RMFTestCase.env.resource_list), 0, "There was no more resources executed!") resource = RMFTestCase.env.resource_list.pop(0) @@ -240,4 +242,15 @@ class UnknownConfigurationMock(): def __repr__(self): return "UnknownConfigurationMock()" + +class FunctionMock(): + def __init__(self, name): + self.name = name + + def __ne__(self, other): + return not self.__eq__(other) + + def __eq__(self, other): + return hasattr(other, '__call__') and hasattr(other, '__name__') and self.name == other.__name__ +
