Repository: ambari Updated Branches: refs/heads/trunk 7e8d3dea8 -> 21fde6765
AMBARI-11242. Every minute Ambari agent freezes for 10-15 while executing status commands (aonishuk) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/21fde676 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/21fde676 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/21fde676 Branch: refs/heads/trunk Commit: 21fde676532f4073e7e089e8e1780412f827a949 Parents: 7e8d3de Author: Andrew Onishuk <[email protected]> Authored: Tue May 19 19:04:25 2015 +0300 Committer: Andrew Onishuk <[email protected]> Committed: Tue May 19 19:04:25 2015 +0300 ---------------------------------------------------------------------- .../ambari_agent/CustomServiceOrchestrator.py | 17 ++-- .../main/python/ambari_agent/PythonExecutor.py | 16 ++-- .../ambari_agent/PythonReflectiveExecutor.py | 98 ++++++++++++++++++++ .../TestCustomServiceOrchestrator.py | 11 ++- .../python/ambari_agent/TestPythonExecutor.py | 19 ++-- .../libraries/functions/check_process_status.py | 12 ++- .../libraries/script/script.py | 7 -- 7 files changed, 138 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/21fde676/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py index b107e3f..e291196 100644 --- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py +++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py @@ -28,6 +28,7 @@ import threading from FileCache import FileCache from AgentException import AgentException from PythonExecutor import PythonExecutor +from PythonReflectiveExecutor import PythonReflectiveExecutor import hostname @@ -55,6 +56,7 @@ class CustomServiceOrchestrator(): AMBARI_SERVER_HOST = "ambari_server_host" DONT_DEBUG_FAILURES_FOR_COMMANDS = [COMMAND_NAME_SECURITY_STATUS, COMMAND_NAME_STATUS] + REFLECTIVELY_RUN_COMMANDS = [COMMAND_NAME_SECURITY_STATUS, COMMAND_NAME_STATUS] # -- commands which run a lot and often (this increases their speed) def __init__(self, config, controller): self.config = config @@ -95,12 +97,15 @@ class CustomServiceOrchestrator(): else: logger.warn("Unable to find pid by taskId = %s" % task_id) - def get_py_executor(self): + def get_py_executor(self, forced_command_name): """ Wrapper for unit testing :return: """ - return PythonExecutor(self.tmp_dir, self.config) + if forced_command_name in self.REFLECTIVELY_RUN_COMMANDS: + return PythonReflectiveExecutor(self.tmp_dir, self.config) + else: + return PythonExecutor(self.tmp_dir, self.config) def runCommand(self, command, tmpoutfile, tmperrfile, forced_command_name=None, override_output_files=True, retry=False): @@ -178,13 +183,13 @@ class CustomServiceOrchestrator(): if command.has_key('commandType') and command['commandType'] == ActionQueue.BACKGROUND_EXECUTION_COMMAND and len(filtered_py_file_list) > 1: raise AgentException("Background commands are supported without hooks only") - python_executor = self.get_py_executor() + python_executor = self.get_py_executor(forced_command_name) for py_file, current_base_dir in filtered_py_file_list: log_info_on_failure = not command_name in self.DONT_DEBUG_FAILURES_FOR_COMMANDS - script_params = [command_name, json_path, current_base_dir] + script_params = [command_name, json_path, current_base_dir, tmpstrucoutfile, logger_level, self.exec_tmp_dir] ret = python_executor.run_file(py_file, script_params, - self.exec_tmp_dir, tmpoutfile, tmperrfile, timeout, - tmpstrucoutfile, logger_level, self.map_task_to_process, + tmpoutfile, tmperrfile, timeout, + tmpstrucoutfile, self.map_task_to_process, task_id, override_output_files, handle = handle, log_info_on_failure=log_info_on_failure) # Next run_file() invocations should always append to current output override_output_files = False http://git-wip-us.apache.org/repos/asf/ambari/blob/21fde676/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py index 3aa26a0..abbbe85 100644 --- a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py +++ b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py @@ -36,7 +36,7 @@ from ambari_commons.shell import shellRunner logger = logging.getLogger() -class PythonExecutor: +class PythonExecutor(object): """ Performs functionality for executing python scripts. Warning: class maintains internal state. As a result, instances should not be @@ -62,8 +62,8 @@ class PythonExecutor: tmperr = open(tmperrfile, 'a') return tmpout, tmperr - def run_file(self, script, script_params, tmp_dir, tmpoutfile, tmperrfile, - timeout, tmpstructedoutfile, logger_level, callback, task_id, + def run_file(self, script, script_params, tmpoutfile, tmperrfile, + timeout, tmpstructedoutfile, callback, task_id, override_output_files = True, handle = None, log_info_on_failure=True): """ Executes the specified python file in a separate subprocess. @@ -76,9 +76,9 @@ class PythonExecutor: The structured out file, however, is preserved during multiple invocations that use the same file. """ - script_params += [tmpstructedoutfile, logger_level, tmp_dir] pythonCommand = self.python_command(script, script_params) logger.debug("Running command " + pprint.pformat(pythonCommand)) + if handle is None: tmpout, tmperr = self.open_subprocess_files(tmpoutfile, tmperrfile, override_output_files) @@ -94,7 +94,7 @@ class PythonExecutor: process.communicate() self.event.set() thread.join() - result = self.prepare_process_result(process, tmpoutfile, tmperrfile, tmpstructedoutfile, timeout=timeout) + result = self.prepare_process_result(process.returncode, tmpoutfile, tmperrfile, tmpstructedoutfile, timeout=timeout) if log_info_on_failure and result['exitcode']: self.on_failure(pythonCommand, result) @@ -123,10 +123,8 @@ class PythonExecutor: ret = shell_runner.run(cmd) logger.info("Command '{0}' returned {1}. {2}{3}".format(cmd, ret["exitCode"], ret["error"], ret["output"])) - def prepare_process_result(self, process, tmpoutfile, tmperrfile, tmpstructedoutfile, timeout=None): + def prepare_process_result(self, returncode, tmpoutfile, tmperrfile, tmpstructedoutfile, timeout=None): out, error, structured_out = self.read_result_from_files(tmpoutfile, tmperrfile, tmpstructedoutfile) - # Building results - returncode = process.returncode if self.python_process_has_been_killed: error = str(error) + "\n Python script has been killed due to timeout" + \ @@ -227,7 +225,7 @@ class BackgroundThread(threading.Thread): process.communicate() self.holder.handle.exitCode = process.returncode - process_condensed_result = self.pythonExecutor.prepare_process_result(process, self.holder.out_file, self.holder.err_file, self.holder.structured_out_file) + process_condensed_result = self.pythonExecutor.prepare_process_result(process.returncode, self.holder.out_file, self.holder.err_file, self.holder.structured_out_file) logger.debug("Calling callback with args %s" % process_condensed_result) self.holder.handle.on_background_command_complete_callback(process_condensed_result, self.holder.handle) logger.debug("Exiting from thread for holder pid %s" % self.holder.handle.pid) http://git-wip-us.apache.org/repos/asf/ambari/blob/21fde676/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py new file mode 100644 index 0000000..4a7ed1c --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py @@ -0,0 +1,98 @@ +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +from PythonExecutor import PythonExecutor + +import imp +import sys +import os +import pprint +import logging +import copy + +logger = logging.getLogger() + +class PythonReflectiveExecutor(PythonExecutor): + """ + Some commands like STATUS, SECURITY_STATUS commands are run a lot, and need to be run really fast. + Otherwise agent will hang waiting for them to complete every X seconds. + + Running the commands not in new proccess, but reflectively makes this really fast. + """ + + def __init__(self, tmpDir, config): + super(PythonReflectiveExecutor, self).__init__(tmpDir, config) + + def run_file(self, script, script_params, tmpoutfile, tmperrfile, + timeout, tmpstructedoutfile, callback, task_id, + override_output_files = True, handle = None, log_info_on_failure=True): + pythonCommand = self.python_command(script, script_params) + logger.debug("Running command reflectively " + pprint.pformat(pythonCommand)) + + script_dir = os.path.dirname(script) + self.open_subprocess_files(tmpoutfile, tmperrfile, override_output_files) + returncode = 1 + + try: + with PythonContext(script_dir, pythonCommand): + imp.load_source('__main__', script) + except SystemExit as e: + returncode = e.code + if returncode: + logger.debug("Reflective command failed with return_code=" + str(e)) + except Exception: + logger.debug("Reflective command failed with exception:", exc_info=1) + else: + returncode = 0 + + return self.prepare_process_result(returncode, tmpoutfile, tmperrfile, tmpstructedoutfile, timeout=timeout) + +class PythonContext: + """ + Sets and resets some context like imports, pythonpath, args. + Also it disable logging into ambari-agent.log for reflectively called scripts. + """ + def __init__(self, script_dir, pythonCommand): + self.script_dir = script_dir + self.pythonCommand = pythonCommand + + def __enter__(self): + self.old_sys_path = copy.copy(sys.path) + self.old_agv = copy.copy(sys.argv) + self.old_sys_modules = copy.copy(sys.modules) + self.old_logging_disable = logging.root.manager.disable + + logging.disable(logging.ERROR) + sys.path.append(self.script_dir) + sys.argv = self.pythonCommand[1:] + + def __exit__(self, exc_type, exc_val, exc_tb): + sys.path = self.old_sys_path + sys.argv = self.old_agv + logging.disable(self.old_logging_disable) + self.revert_sys_modules(self.old_sys_modules) + return False + + def revert_sys_modules(self, value): + sys.modules.update(value) + + for k in copy.copy(sys.modules): + if not k in value: + del sys.modules[k] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/21fde676/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py index 2fb2ae5..831ecce 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py +++ b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py @@ -285,14 +285,15 @@ class TestCustomServiceOrchestrator(TestCase): 'exitcode': 0, } ret = orchestrator.runCommand(command, "out.txt", "err.txt", - forced_command_name=CustomServiceOrchestrator.COMMAND_NAME_STATUS) + forced_command_name=CustomServiceOrchestrator.SCRIPT_TYPE_PYTHON) ## Check that override_output_files was true only during first call - self.assertEquals(run_file_mock.call_args_list[0][0][10], True) - self.assertEquals(run_file_mock.call_args_list[1][0][10], False) - self.assertEquals(run_file_mock.call_args_list[2][0][10], False) + print run_file_mock + self.assertEquals(run_file_mock.call_args_list[0][0][8], True) + self.assertEquals(run_file_mock.call_args_list[1][0][8], False) + self.assertEquals(run_file_mock.call_args_list[2][0][8], False) ## Check that forced_command_name was taken into account self.assertEqual(run_file_mock.call_args_list[0][0][1][0], - CustomServiceOrchestrator.COMMAND_NAME_STATUS) + CustomServiceOrchestrator.SCRIPT_TYPE_PYTHON) run_file_mock.reset_mock() http://git-wip-us.apache.org/repos/asf/ambari/blob/21fde676/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py b/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py index 4105500..eb6dd29 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py +++ b/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py @@ -37,6 +37,7 @@ if get_platform() != PLATFORM_WINDOWS: else: os_distro_value = ('win2012serverr2','6.3','WindowsServer') [email protected](PythonExecutor, "open_subprocess_files", new=MagicMock(return_value =("", ""))) class TestPythonExecutor(TestCase): @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @@ -64,8 +65,8 @@ class TestPythonExecutor(TestCase): subproc_mock.returncode = None callback_method = MagicMock() thread = Thread(target = executor.run_file, args = ("fake_puppetFile", - ["arg1", "arg2"], "/fake_tmp_dir", tmpoutfile, tmperrfile, - PYTHON_TIMEOUT_SECONDS, tmpstrucout, "INFO", callback_method, '1')) + ["arg1", "arg2"], tmpoutfile, tmperrfile, + PYTHON_TIMEOUT_SECONDS, tmpstrucout, callback_method, '1')) thread.start() time.sleep(0.1) subproc_mock.finished_event.wait() @@ -96,9 +97,9 @@ class TestPythonExecutor(TestCase): subproc_mock.returncode = 0 callback_method = MagicMock() thread = Thread(target = executor.run_file, args = ("fake_puppetFile", ["arg1", "arg2"], - "/fake_tmp_dir", tmpoutfile, tmperrfile, + tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS, tmpstrucout, - "INFO", callback_method, "1-1")) + callback_method, "1-1")) thread.start() time.sleep(0.1) subproc_mock.should_finish_event.set() @@ -131,10 +132,10 @@ class TestPythonExecutor(TestCase): subproc_mock.returncode = 0 subproc_mock.should_finish_event.set() callback_method = MagicMock() - result = executor.run_file("file", ["arg1", "arg2"], "/fake_tmp_dir", + result = executor.run_file("file", ["arg1", "arg2"], tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS, - tmpstructuredoutfile, "INFO", callback_method, "1-1") - self.assertEquals(result, {'exitcode': 0, 'stderr': 'Dummy err', 'stdout': 'Dummy output', + tmpstructuredoutfile, callback_method, "1-1") + self.assertEquals(result, {'exitcode': 0, 'stderr': '', 'stdout': '', 'structuredOut': {}}) self.assertTrue(callback_method.called) @@ -179,11 +180,7 @@ class TestPythonExecutor(TestCase): def communicate(self): self.started_event.set() - self.tmpout.write("Dummy output") - self.tmpout.flush() - self.tmperr.write("Dummy err") - self.tmperr.flush() self.should_finish_event.wait() self.finished_event.set() pass http://git-wip-us.apache.org/repos/asf/ambari/blob/21fde676/ambari-common/src/main/python/resource_management/libraries/functions/check_process_status.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/check_process_status.py b/ambari-common/src/main/python/resource_management/libraries/functions/check_process_status.py index a3e4a39..21cb3bb 100644 --- a/ambari-common/src/main/python/resource_management/libraries/functions/check_process_status.py +++ b/ambari-common/src/main/python/resource_management/libraries/functions/check_process_status.py @@ -47,10 +47,14 @@ def check_process_status(pid_file): Logger.debug("Pid file {0} does not exist".format(pid_file)) raise ComponentIsNotRunning() - code, out = shell.call(["ps","-p", str(pid)]) - - if code: + try: + # Kill will not actually kill the process + # From the doc: + # If sig is 0, then no signal is sent, but error checking is still + # performed; this can be used to check for the existence of a + # process ID or process group ID. + os.kill(pid, 0) + except OSError: Logger.debug("Process with pid {0} is not running. Stale pid file" " at {1}".format(pid, pid_file)) raise ComponentIsNotRunning() - pass http://git-wip-us.apache.org/repos/asf/ambari/blob/21fde676/ambari-common/src/main/python/resource_management/libraries/script/script.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/resource_management/libraries/script/script.py b/ambari-common/src/main/python/resource_management/libraries/script/script.py index 8de9247..f9eda81 100644 --- a/ambari-common/src/main/python/resource_management/libraries/script/script.py +++ b/ambari-common/src/main/python/resource_management/libraries/script/script.py @@ -214,13 +214,6 @@ class Script(object): method(env) if command_name == "install": self.set_version() - except ClientComponentHasNoStatus or ComponentIsNotRunning: - # Support of component status checks. - # Non-zero exit code is interpreted as an INSTALLED status of a component - sys.exit(1) - except Fail: - logger.exception("Error while executing command '{0}':".format(command_name)) - sys.exit(1) finally: if self.should_expose_component_version(command_name): self.save_component_version_to_structured_out()
