Repository: ambari Updated Branches: refs/heads/branch-2.2 736e3ec2f -> ad9fce8a8
AMBARI-15558. ambari-agent upstart script broken in RHEL6 (aonishuk) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/ad9fce8a Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/ad9fce8a Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/ad9fce8a Branch: refs/heads/branch-2.2 Commit: ad9fce8a872f79912176c0221d156d456a968d88 Parents: 736e3ec Author: Andrew Onishuk <[email protected]> Authored: Thu Mar 24 17:15:19 2016 +0200 Committer: Andrew Onishuk <[email protected]> Committed: Thu Mar 24 17:15:19 2016 +0200 ---------------------------------------------------------------------- ambari-agent/etc/init/ambari-agent.conf | 1 - .../src/main/python/ambari_agent/ExitHelper.py | 1 + .../python/ambari_agent/HeartbeatHandlers.py | 27 ++++-------- .../src/main/python/ambari_agent/main.py | 43 ++++++++++++-------- .../test/python/ambari_agent/TestController.py | 5 --- .../src/test/python/ambari_agent/TestMain.py | 32 ++++++++++----- 6 files changed, 55 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/ad9fce8a/ambari-agent/etc/init/ambari-agent.conf ---------------------------------------------------------------------- diff --git a/ambari-agent/etc/init/ambari-agent.conf b/ambari-agent/etc/init/ambari-agent.conf index 75c1b06..b3f2987 100644 --- a/ambari-agent/etc/init/ambari-agent.conf +++ b/ambari-agent/etc/init/ambari-agent.conf @@ -17,7 +17,6 @@ description "ambari agent" stop on runlevel [06] -kill signal SIGKILL respawn script http://git-wip-us.apache.org/repos/asf/ambari/blob/ad9fce8a/ambari-agent/src/main/python/ambari_agent/ExitHelper.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ExitHelper.py b/ambari-agent/src/main/python/ambari_agent/ExitHelper.py index 06dfadb..e51646f 100644 --- a/ambari-agent/src/main/python/ambari_agent/ExitHelper.py +++ b/ambari-agent/src/main/python/ambari_agent/ExitHelper.py @@ -64,6 +64,7 @@ class ExitHelper(object): def exit(self, code): self.execute_cleanup() + logger.info("Cleanup finished, exiting with code:" + str(code)) os._exit(code) http://git-wip-us.apache.org/repos/asf/ambari/blob/ad9fce8a/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py index f9d0e14..4347595 100644 --- a/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py +++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py @@ -94,18 +94,9 @@ def debug(sig, frame): @OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT) class HeartbeatStopHandlersLinux(HeartbeatStopHandlers): - def __init__(self, stopEvent=None): - # Event is used for synchronizing heartbeat iterations (to make possible - # manual wait() interruption between heartbeats ) + def __init__(self): self.heartbeat_wait_event = threading.Event() - - # Event is used to stop the Agent process - if stopEvent is None: - # Allow standalone testing - self.stop_event = threading.Event() - else: - # Allow one unique event per process - self.stop_event = stopEvent + self._stop = False def set_heartbeat(self): self.heartbeat_wait_event.set() @@ -114,19 +105,15 @@ class HeartbeatStopHandlersLinux(HeartbeatStopHandlers): self.heartbeat_wait_event.clear() def set_stop(self): - self.stop_event.set() + self._stop = True def wait(self, timeout1, timeout2=0): - if self.heartbeat_wait_event.wait(timeout=timeout1): - # Event signaled, exit - return 1 - # Stop loop when stop event received - # Otherwise sleep a bit more to allow STATUS_COMMAND results to be collected - # and sent in one heartbeat. Also avoid server overload with heartbeats - if self.stop_event.wait(timeout=timeout2): + if self._stop: logger.info("Stop event received") return 0 - # Timeout + + if self.heartbeat_wait_event.wait(timeout=timeout1): + return 1 return -1 http://git-wip-us.apache.org/repos/asf/ambari/blob/ad9fce8a/ambari-agent/src/main/python/ambari_agent/main.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py index 22e2e4a..b146ba8 100644 --- a/ambari-agent/src/main/python/ambari_agent/main.py +++ b/ambari-agent/src/main/python/ambari_agent/main.py @@ -59,6 +59,9 @@ IS_LINUX = platform.system() == "Linux" SYSLOG_FORMAT_STRING = ' ambari_agent - %(filename)s - [%(process)d] - %(name)s - %(levelname)s - %(message)s' SYSLOG_FORMATTER = logging.Formatter(SYSLOG_FORMAT_STRING) +GRACEFUL_STOP_TRIES = 10 +GRACEFUL_STOP_TRIES_SLEEP = 3 + def setup_logging(logger, filename, logging_level): formatter = logging.Formatter(formatstr) @@ -164,22 +167,26 @@ def daemonize(): pid = str(os.getpid()) file(ProcessHelper.pidfile, 'w').write(pid) - def stop_agent(): # stop existing Ambari agent pid = -1 runner = shellRunner() try: - f = open(ProcessHelper.pidfile, 'r') - pid = f.read() + with open(ProcessHelper.pidfile, 'r') as f: + pid = f.read() pid = int(pid) - f.close() + runner.run([AMBARI_SUDO_BINARY, 'kill', '-15', str(pid)]) - time.sleep(5) - if os.path.exists(ProcessHelper.pidfile): - raise Exception("PID file still exists.") - sys.exit(0) + for i in range(GRACEFUL_STOP_TRIES): + result = runner.run([AMBARI_SUDO_BINARY, 'kill', '-0', str(pid)]) + if result['exitCode'] != 0: + logger.info("Agent died gracefully, exiting.") + sys.exit(0) + time.sleep(GRACEFUL_STOP_TRIES_SLEEP) + logger.info("Agent not going to die gracefully, going to execute kill -9") + raise Exception("Agent is running") except Exception, err: + #raise if pid == -1: print ("Agent process is not running") else: @@ -236,7 +243,6 @@ def main(heartbeat_stop_callback=None): default_cfg = {'agent': {'prefix': '/home/ambari'}} config.load(default_cfg) - bind_signal_handlers(agentPid) if (len(sys.argv) > 1) and sys.argv[1] == 'stop': stop_agent() @@ -292,11 +298,12 @@ def main(heartbeat_stop_callback=None): # Launch Controller communication controller = Controller(config, heartbeat_stop_callback) controller.start() - controller.join() - if not OSCheck.get_os_family() == OSConst.WINSRV_FAMILY: - ExitHelper.execute_cleanup() - stop_agent() - logger.info("finished") + + # controller.join() is not appropriate here as it blocks the signal handlers for stop etc. + while controller.is_alive(): + time.sleep(0.1) + + ExitHelper().exit(0) if __name__ == "__main__": is_logger_setup = False @@ -304,7 +311,9 @@ if __name__ == "__main__": heartbeat_stop_callback = bind_signal_handlers(agentPid) main(heartbeat_stop_callback) - except: + except SystemExit as e: + raise e + except BaseException as e: if is_logger_setup: - logger.exception("Fatal exception occurred:") - raise + logger.exception("Exiting with exception:" + e) + raise http://git-wip-us.apache.org/repos/asf/ambari/blob/ad9fce8a/ambari-agent/src/test/python/ambari_agent/TestController.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestController.py b/ambari-agent/src/test/python/ambari_agent/TestController.py index 00725c4..9b3ce47 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestController.py +++ b/ambari-agent/src/test/python/ambari_agent/TestController.py @@ -545,8 +545,6 @@ class TestController(unittest.TestCase): response["restartAgent"] = "false" self.controller.heartbeatWithServer() - event_mock.assert_any_call(timeout= - self.controller.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS) # Check that server continues to heartbeat after connection errors self.controller.responseId = 1 @@ -566,9 +564,6 @@ class TestController(unittest.TestCase): self.controller.heartbeatWithServer() self.assertTrue(sendRequest.call_count > 5) - event_mock.assert_called_with(timeout= - self.controller.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS) - sys.stdout = sys.__stdout__ self.controller.sendRequest = Controller.Controller.sendRequest self.controller.sendRequest = Controller.Controller.addToQueue http://git-wip-us.apache.org/repos/asf/ambari/blob/ad9fce8a/ambari-agent/src/test/python/ambari_agent/TestMain.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestMain.py b/ambari-agent/src/test/python/ambari_agent/TestMain.py index 873f0ba..63f8024 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestMain.py +++ b/ambari-agent/src/test/python/ambari_agent/TestMain.py @@ -29,7 +29,7 @@ import ConfigParser from ambari_commons import OSCheck from only_for_platform import get_platform, not_for_platform, only_for_platform, PLATFORM_WINDOWS, PLATFORM_LINUX -from mock.mock import MagicMock, patch, ANY, Mock +from mock.mock import MagicMock, patch, ANY, Mock, call if get_platform() != PLATFORM_WINDOWS: os_distro_value = ('Suse','11','Final') @@ -207,13 +207,13 @@ class TestMain(unittest.TestCase): @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch("time.sleep") @patch.object(shellRunnerLinux,"run") - @patch("sys.exit") @patch("os.path.exists") - def test_daemonize_and_stop(self, exists_mock, sys_exit_mock, kill_mock, sleep_mock): + def test_daemonize_and_stop(self, exists_mock, kill_mock, sleep_mock): oldpid = ProcessHelper.pidfile pid = str(os.getpid()) _, tmpoutfile = tempfile.mkstemp() ProcessHelper.pidfile = tmpoutfile + kill_mock.return_value = {'exitCode': 1} # Test daemonization main.daemonize() @@ -223,21 +223,30 @@ class TestMain(unittest.TestCase): # Reuse pid file when testing agent stop # Testing normal exit exists_mock.return_value = False - main.stop_agent() - kill_mock.assert_called_with(['ambari-sudo.sh', 'kill', '-15', pid]) - sys_exit_mock.assert_called_with(0) + try: + main.stop_agent() + raise Exception("main.stop_agent() should raise sys.exit(0).") + except SystemExit as e: + self.assertEquals(0, e.code); + + kill_mock.assert_any_call(['ambari-sudo.sh', 'kill', '-15', pid]) + # Restore kill_mock.reset_mock() - sys_exit_mock.reset_mock() kill_mock.return_value = {'exitCode': 0, 'output': 'out', 'error': 'err'} # Testing exit when failed to remove pid file exists_mock.return_value = True - main.stop_agent() + + try: + main.stop_agent() + raise Exception("main.stop_agent() should raise sys.exit(0).") + except SystemExit as e: + self.assertEquals(1, e.code); + kill_mock.assert_any_call(['ambari-sudo.sh', 'kill', '-15', pid]) kill_mock.assert_any_call(['ambari-sudo.sh', 'kill', '-9', pid]) - sys_exit_mock.assert_called_with(1) # Restore ProcessHelper.pidfile = oldpid @@ -299,8 +308,8 @@ class TestMain(unittest.TestCase): @patch.object(main, "update_log_level") @patch.object(NetUtil.NetUtil, "try_to_connect") @patch.object(Controller, "__init__") + @patch.object(Controller, "is_alive") @patch.object(Controller, "start") - @patch.object(Controller, "join") @patch("optparse.OptionParser.parse_args") @patch.object(DataCleaner,"start") @patch.object(DataCleaner,"__init__") @@ -308,13 +317,14 @@ class TestMain(unittest.TestCase): @patch.object(PingPortListener,"__init__") @patch.object(ExitHelper,"execute_cleanup") def test_main(self, cleanup_mock, ping_port_init_mock, ping_port_start_mock, data_clean_init_mock,data_clean_start_mock, - parse_args_mock, join_mock, start_mock, Controller_init_mock, try_to_connect_mock, + parse_args_mock, start_mock, Controller_is_alive_mock, Controller_init_mock, try_to_connect_mock, update_log_level_mock, daemonize_mock, perform_prestart_checks_mock, ambari_config_mock, stop_mock, bind_signal_handlers_mock, setup_logging_mock, socket_mock): data_clean_init_mock.return_value = None Controller_init_mock.return_value = None + Controller_is_alive_mock.return_value = False ping_port_init_mock.return_value = None options = MagicMock() parse_args_mock.return_value = (options, MagicMock)
