Repository: ambari Updated Branches: refs/heads/branch-feature-AMBARI-18456 532caef33 -> 159ad0032
Revert "AMBARI-18629. HDFS goes down after installing cluster (aonishuk) and AMBARI-18505. Ambari Status commands should enforce a timeout < heartbeat interval (aonishuk)" Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/c3b31d6f Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/c3b31d6f Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/c3b31d6f Branch: refs/heads/branch-feature-AMBARI-18456 Commit: c3b31d6f7461625d6a7fe533025c5f604c67ffb5 Parents: 7ed5259 Author: Andrew Onishuk <aonis...@hortonworks.com> Authored: Wed Oct 19 01:52:05 2016 +0300 Committer: Andrew Onishuk <aonis...@hortonworks.com> Committed: Wed Oct 19 01:52:05 2016 +0300 ---------------------------------------------------------------------- ambari-agent/conf/unix/ambari-agent.ini | 1 - .../src/main/python/ambari_agent/ActionQueue.py | 16 +------- .../ambari_agent/PythonReflectiveExecutor.py | 25 +++--------- .../test/python/ambari_agent/TestActionQueue.py | 3 +- .../main/python/ambari_commons/thread_utils.py | 43 -------------------- 5 files changed, 8 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/c3b31d6f/ambari-agent/conf/unix/ambari-agent.ini ---------------------------------------------------------------------- diff --git a/ambari-agent/conf/unix/ambari-agent.ini b/ambari-agent/conf/unix/ambari-agent.ini index 1c39c24..914e09a 100644 --- a/ambari-agent/conf/unix/ambari-agent.ini +++ b/ambari-agent/conf/unix/ambari-agent.ini @@ -32,7 +32,6 @@ tolerate_download_failures=true run_as_user=root parallel_execution=0 alert_grace_period=5 -status_command_timeout=2 alert_kinit_timeout=14400000 system_resource_overrides=/etc/resource_overrides ; memory_threshold_soft_mb=400 http://git-wip-us.apache.org/repos/asf/ambari/blob/c3b31d6f/ambari-agent/src/main/python/ambari_agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index 5962d94..f104939 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -29,14 +29,12 @@ import time import signal from AgentException import AgentException -from PythonReflectiveExecutor import PythonReflectiveExecutor from LiveStatus import LiveStatus from ActualConfigHandler import ActualConfigHandler from CommandStatusDict import CommandStatusDict from CustomServiceOrchestrator import CustomServiceOrchestrator from ambari_agent.BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle from ambari_commons.str_utils import split_on_chunks -from ambari_commons.thread_utils import terminate_thread logger = logging.getLogger() @@ -87,7 +85,6 @@ class ActionQueue(threading.Thread): self.tmpdir = config.get('agent', 'prefix') self.customServiceOrchestrator = CustomServiceOrchestrator(config, controller) self.parallel_execution = config.get_parallel_exec_option() - self.status_command_timeout = int(self.config.get('agent', 'status_command_timeout', 2)) if self.parallel_execution == 1: logger.info("Parallel execution is enabled, will execute agent commands in parallel") @@ -228,18 +225,7 @@ class ActionQueue(threading.Thread): if self.controller.recovery_manager.enabled(): self.controller.recovery_manager.stop_execution_command() elif commandType == self.STATUS_COMMAND: - component_name = command['componentName'] - - thread = threading.Thread(target = self.execute_status_command, args = (command,)) - thread.daemon = True # hanging status commands should not be prevent ambari-agent from stopping - thread.start() - thread.join(timeout=self.status_command_timeout) - - if thread.isAlive(): - terminate_thread(thread) - # Force context to reset to normal. By context we mean sys.path, imports, logger setting, etc. They are set by specific status command, and are not relevant to ambari-agent. - PythonReflectiveExecutor.last_context.revert() - logger.warn("Command {0} for {1} was running for more than {2} seconds. Terminated due to timeout.".format(commandType, component_name, self.status_command_timeout)) + self.execute_status_command(command) else: logger.error("Unrecognized command " + pprint.pformat(command)) except Exception: http://git-wip-us.apache.org/repos/asf/ambari/blob/c3b31d6f/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py index b476671..655b2fc 100644 --- a/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py +++ b/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py @@ -53,9 +53,7 @@ class PythonReflectiveExecutor(PythonExecutor): returncode = 1 try: - current_context = PythonContext(script_dir, pythonCommand) - PythonReflectiveExecutor.last_context = current_context - with current_context: + with PythonContext(script_dir, pythonCommand): imp.load_source('__main__', script) except SystemExit as e: returncode = e.code @@ -64,10 +62,7 @@ class PythonReflectiveExecutor(PythonExecutor): except (ClientComponentHasNoStatus, ComponentIsNotRunning): logger.debug("Reflective command failed with exception:", exc_info=1) except Exception: - if current_context.is_forced_revert: - logger.info("Hanging status command finished its execution") - else: - logger.info("Reflective command failed with exception:", exc_info=1) + logger.info("Reflective command failed with exception:", exc_info=1) else: returncode = 0 @@ -81,8 +76,6 @@ class PythonContext: def __init__(self, script_dir, pythonCommand): self.script_dir = script_dir self.pythonCommand = pythonCommand - self.is_reverted = False - self.is_forced_revert = False def __enter__(self): self.old_sys_path = copy.copy(sys.path) @@ -95,18 +88,12 @@ class PythonContext: sys.argv = self.pythonCommand[1:] def __exit__(self, exc_type, exc_val, exc_tb): - self.revert(is_forced_revert=False) + sys.path = self.old_sys_path + sys.argv = self.old_agv + logging.disable(self.old_logging_disable) + self.revert_sys_modules(self.old_sys_modules) return False - def revert(self, is_forced_revert=True): - if not self.is_reverted: - self.is_forced_revert = is_forced_revert - self.is_reverted = True - sys.path = self.old_sys_path - sys.argv = self.old_agv - logging.disable(self.old_logging_disable) - self.revert_sys_modules(self.old_sys_modules) - def revert_sys_modules(self, value): sys.modules.update(value) http://git-wip-us.apache.org/repos/asf/ambari/blob/c3b31d6f/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py index 32773b8..7d04d42 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py +++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py @@ -225,7 +225,6 @@ class TestActionQueue(TestCase): retryable_command = { 'commandType': 'EXECUTION_COMMAND', 'role': 'NAMENODE', - 'componentName': 'NAMENODE', 'roleCommand': 'INSTALL', 'commandId': '1-1', 'taskId': 19, @@ -323,7 +322,6 @@ class TestActionQueue(TestCase): } status_command = { 'commandType' : ActionQueue.STATUS_COMMAND, - 'componentName': 'NAMENODE' } wrong_command = { 'commandType' : "SOME_WRONG_COMMAND", @@ -1128,6 +1126,7 @@ class TestActionQueue(TestCase): self.assertTrue(runCommand_mock.called) self.assertEqual(2, runCommand_mock.call_count) self.assertEqual(1, sleep_mock.call_count) + sleep_mock.assert_has_calls([call(1)], False) runCommand_mock.assert_has_calls([ call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt', os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=True, retry=False), http://git-wip-us.apache.org/repos/asf/ambari/blob/c3b31d6f/ambari-common/src/main/python/ambari_commons/thread_utils.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/ambari_commons/thread_utils.py b/ambari-common/src/main/python/ambari_commons/thread_utils.py deleted file mode 100644 index 952022c..0000000 --- a/ambari-common/src/main/python/ambari_commons/thread_utils.py +++ /dev/null @@ -1,43 +0,0 @@ -#!/usr/bin/env python - -''' -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -''' - -def terminate_thread(thread): - """Terminates a python thread abruptly from another thread. - - This is consider a bad pattern to do this. - If possible, please consider handling stopping of the thread from inside of it - or creating thread as a separate process (multiprocessing module). - - :param thread: a threading.Thread instance - """ - import ctypes - if not thread.isAlive(): - return - - exc = ctypes.py_object(SystemExit) - res = ctypes.pythonapi.PyThreadState_SetAsyncExc( - ctypes.c_long(thread.ident), exc) - if res == 0: - raise ValueError("nonexistent thread id") - elif res > 1: - # """if it returns a number greater than one, you're in trouble, - # and you should call it again with exc=NULL to revert the effect""" - ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None) - raise SystemError("PyThreadState_SetAsyncExc failed") \ No newline at end of file