Repository: ambari Updated Branches: refs/heads/trunk 7ff455102 -> fcb18659b
AMBARI-10083 - Ambari Agent Alerts Prevents Binding to the Ping Port Listener On Startup (jonathanhurley) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/fcb18659 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/fcb18659 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/fcb18659 Branch: refs/heads/trunk Commit: fcb18659be579ff832b066f645476ffe1c3ec847 Parents: 7ff4551 Author: Jonathan Hurley <[email protected]> Authored: Mon Mar 16 11:27:55 2015 -0400 Committer: Jonathan Hurley <[email protected]> Committed: Mon Mar 16 13:00:14 2015 -0400 ---------------------------------------------------------------------- .../ambari_agent/AlertSchedulerHandler.py | 79 +++++++++++--------- .../src/main/python/ambari_agent/Controller.py | 17 +---- .../main/python/ambari_agent/ProcessHelper.py | 6 +- .../src/main/python/ambari_agent/main.py | 8 +- .../src/test/python/ambari_agent/TestMain.py | 32 ++++---- .../package/alerts/alert_hive_metastore.py | 54 ++++++------- .../package/alerts/alert_hive_thrift_port.py | 6 +- 7 files changed, 103 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/fcb18659/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py index a2ea8ef..29807e2 100644 --- a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py +++ b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py @@ -26,6 +26,8 @@ import logging import os import sys import time +import atexit + from apscheduler.scheduler import Scheduler from alerts.collector import AlertCollector from alerts.metric_alert import MetricAlert @@ -35,7 +37,6 @@ from alerts.web_alert import WebAlert logger = logging.getLogger() - class AlertSchedulerHandler(): FILENAME = 'definitions.json' TYPE_PORT = 'PORT' @@ -49,7 +50,6 @@ class AlertSchedulerHandler(): 'standalone': False } - def __init__(self, cachedir, stacks_dir, common_services_dir, host_scripts_dir, cluster_configuration, config, in_minutes=True): @@ -71,6 +71,16 @@ class AlertSchedulerHandler(): self.__in_minutes = in_minutes self.config = config + # register python exit handler + atexit.register(self.exit_handler) + + + def exit_handler(self): + """ + Exit handler + """ + self.stop() + def update_definitions(self, heartbeat): """ @@ -116,12 +126,12 @@ class AlertSchedulerHandler(): self.__scheduler = Scheduler(AlertSchedulerHandler.APS_CONFIG) alert_callables = self.__load_definitions() - + # schedule each definition for _callable in alert_callables: self.schedule_definition(_callable) - - logger.debug("[AlertScheduler] Starting {0}; currently running: {1}".format( + + logger.info("[AlertScheduler] Starting {0}; currently running: {1}".format( str(self.__scheduler), str(self.__scheduler.running))) self.__scheduler.start() @@ -132,22 +142,23 @@ class AlertSchedulerHandler(): self.__scheduler.shutdown(wait=False) self.__scheduler = Scheduler(AlertSchedulerHandler.APS_CONFIG) + logger.info("[AlertScheduler] Stopped the alert scheduler.") def reschedule(self): """ - Removes jobs that are scheduled where their UUID no longer is valid. + Removes jobs that are scheduled where their UUID no longer is valid. Schedules jobs where the definition UUID is not currently scheduled. """ jobs_scheduled = 0 jobs_removed = 0 - + definitions = self.__load_definitions() scheduled_jobs = self.__scheduler.get_jobs() - + # for every scheduled job, see if its UUID is still valid for scheduled_job in scheduled_jobs: uuid_valid = False - + for definition in definitions: definition_uuid = definition.get_uuid() if scheduled_job.name == definition_uuid: @@ -160,7 +171,7 @@ class AlertSchedulerHandler(): logger.info("[AlertScheduler] Unscheduling {0}".format(scheduled_job.name)) self._collector.remove_by_uuid(scheduled_job.name) self.__scheduler.unschedule_job(scheduled_job) - + # for every definition, determine if there is a scheduled job for definition in definitions: definition_scheduled = False @@ -169,12 +180,12 @@ class AlertSchedulerHandler(): if definition_uuid == scheduled_job.name: definition_scheduled = True break - + # if no jobs are found with the definitions UUID, schedule it if definition_scheduled == False: jobs_scheduled += 1 self.schedule_definition(definition) - + logger.info("[AlertScheduler] Reschedule Summary: {0} rescheduled, {1} unscheduled".format( str(jobs_scheduled), str(jobs_removed))) @@ -209,7 +220,7 @@ class AlertSchedulerHandler(): def collector(self): """ gets the collector for reporting to the server """ return self._collector - + def __load_definitions(self): """ @@ -218,7 +229,7 @@ class AlertSchedulerHandler(): :return: """ definitions = [] - + all_commands = None alerts_definitions_path = os.path.join(self.cachedir, self.FILENAME) try: @@ -227,21 +238,21 @@ class AlertSchedulerHandler(): except: logger.warning('[AlertScheduler] {0} not found or invalid. No alerts will be scheduled until registration occurs.'.format(alerts_definitions_path)) return definitions - + for command_json in all_commands: clusterName = '' if not 'clusterName' in command_json else command_json['clusterName'] hostName = '' if not 'hostName' in command_json else command_json['hostName'] for definition in command_json['alertDefinitions']: alert = self.__json_to_callable(clusterName, hostName, definition) - + if alert is None: continue - + alert.set_helpers(self._collector, self._cluster_configuration) definitions.append(alert) - + return definitions @@ -255,7 +266,7 @@ class AlertSchedulerHandler(): if logger.isEnabledFor(logging.DEBUG): logger.debug("[AlertScheduler] Creating job type {0} with {1}".format(source_type, str(json_definition))) - + alert = None if source_type == AlertSchedulerHandler.TYPE_METRIC: @@ -289,7 +300,7 @@ class AlertSchedulerHandler(): logger.info("[AlertScheduler] The alert {0} with UUID {1} is disabled and will not be scheduled".format( definition.get_name(),definition.get_uuid())) return - + job = None if self.__in_minutes: @@ -298,15 +309,15 @@ class AlertSchedulerHandler(): else: job = self.__scheduler.add_interval_job(self.__make_function(definition), seconds=definition.interval()) - - # although the documentation states that Job(kwargs) takes a name + + # although the documentation states that Job(kwargs) takes a name # key/value pair, it does not actually set the name; do it manually if job is not None: job.name = definition.get_uuid() - + logger.info("[AlertScheduler] Scheduling {0} with UUID {1}".format( definition.get_name(), definition.get_uuid())) - + def get_job_count(self): """ @@ -315,10 +326,10 @@ class AlertSchedulerHandler(): """ if self.__scheduler is None: return 0 - + return len(self.__scheduler.get_jobs()) - + def execute_alert(self, execution_commands): """ Executes an alert immediately, ignoring any scheduled jobs. The existing @@ -331,18 +342,18 @@ class AlertSchedulerHandler(): for execution_command in execution_commands: try: alert_definition = execution_command['alertDefinition'] - + clusterName = '' if not 'clusterName' in execution_command else execution_command['clusterName'] - hostName = '' if not 'hostName' in execution_command else execution_command['hostName'] - + hostName = '' if not 'hostName' in execution_command else execution_command['hostName'] + alert = self.__json_to_callable(clusterName, hostName, alert_definition) - + if alert is None: continue - + logger.info("[AlertScheduler] Executing on-demand alert {0} ({1})".format(alert.get_name(), alert.get_uuid())) - + alert.set_helpers(self._collector, self._cluster_configuration) alert.collect() except: @@ -357,7 +368,7 @@ def main(): logger.setLevel(logging.DEBUG) except TypeError: logger.setLevel(12) - + ch = logging.StreamHandler() ch.setLevel(logger.level) logger.addHandler(ch) @@ -376,7 +387,7 @@ def main(): print str(ash.collector().alerts()) ash.stop() - + if __name__ == "__main__": main() http://git-wip-us.apache.org/repos/asf/ambari/blob/fcb18659/ambari-agent/src/main/python/ambari_agent/Controller.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py index 2300b47..eeca4c2 100644 --- a/ambari-agent/src/main/python/ambari_agent/Controller.py +++ b/ambari-agent/src/main/python/ambari_agent/Controller.py @@ -102,7 +102,6 @@ class Controller(threading.Thread): def __del__(self): logger.info("Server connection disconnected.") - pass def registerWithServer(self): """ @@ -151,7 +150,6 @@ class Controller(threading.Thread): if 'statusCommands' in ret.keys(): logger.info("Got status commands on registration.") self.addToStatusQueue(ret['statusCommands']) - pass else: self.hasMappedComponents = False @@ -170,7 +168,7 @@ class Controller(threading.Thread): logger.error("Unable to connect to: " + self.registerUrl, exc_info=True) """ Sleeping for {0} seconds and then retrying again """.format(delay) time.sleep(delay) - pass + return ret def cancelCommandInQueue(self, commands): @@ -180,8 +178,6 @@ class Controller(threading.Thread): self.actionQueue.cancel(commands) except Exception, err: logger.error("Exception occurred on commands cancel: %s", err.message) - pass - pass def addToQueue(self, commands): """Add to the queue for running the commands """ @@ -192,7 +188,6 @@ class Controller(threading.Thread): else: """Only add to the queue if not empty list """ self.actionQueue.put(commands) - pass def addToStatusQueue(self, commands): if not commands: @@ -201,7 +196,6 @@ class Controller(threading.Thread): if not LiveStatus.SERVICES: self.updateComponents(commands[0]['clusterName']) self.actionQueue.put_status(commands) - pass # For testing purposes DEBUG_HEARTBEAT_RETRIES = 0 @@ -223,7 +217,6 @@ class Controller(threading.Thread): if not retry: data = json.dumps( self.heartbeat.build(self.responseId, int(hb_interval), self.hasMappedComponents)) - pass else: self.DEBUG_HEARTBEAT_RETRIES += 1 @@ -333,7 +326,6 @@ class Controller(threading.Thread): # Stop loop when stop event received logger.info("Stop event received") self.DEBUG_STOP_HEARTBEATING=True - pass def run(self): self.actionQueue = ActionQueue(self.config, controller=self) @@ -350,8 +342,6 @@ class Controller(threading.Thread): if not self.repeatRegistration: break - pass - def registerAndHeartbeat(self): registerResponse = self.registerWithServer() message = registerResponse['response'] @@ -373,8 +363,8 @@ class Controller(threading.Thread): self.heartbeatWithServer() def restartAgent(self): - os._exit(AGENT_AUTO_RESTART_EXIT_CODE) - pass + sys.exit(AGENT_AUTO_RESTART_EXIT_CODE) + def sendRequest(self, url, data): response = None @@ -411,7 +401,6 @@ class Controller(threading.Thread): logger.debug("LiveStatus.SERVICES" + str(LiveStatus.SERVICES)) logger.debug("LiveStatus.CLIENT_COMPONENTS" + str(LiveStatus.CLIENT_COMPONENTS)) logger.debug("LiveStatus.COMPONENTS" + str(LiveStatus.COMPONENTS)) - pass def main(argv=None): # Allow Ctrl-C http://git-wip-us.apache.org/repos/asf/ambari/blob/fcb18659/ambari-agent/src/main/python/ambari_agent/ProcessHelper.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ProcessHelper.py b/ambari-agent/src/main/python/ambari_agent/ProcessHelper.py index 2d99dd1..bc2f827 100644 --- a/ambari-agent/src/main/python/ambari_agent/ProcessHelper.py +++ b/ambari-agent/src/main/python/ambari_agent/ProcessHelper.py @@ -35,7 +35,6 @@ pidfile = os.path.join(piddir, "ambari-agent.pid") def _clean(): - logger.info("Removing pid file") try: os.unlink(pidfile) @@ -54,14 +53,11 @@ def _clean(): def stopAgent(): - _clean() - os._exit(0) - pass + sys.exit(0) def restartAgent(): - _clean() executable = sys.executable http://git-wip-us.apache.org/repos/asf/ambari/blob/fcb18659/ambari-agent/src/main/python/ambari_agent/main.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py index ebf0781..23f575b 100644 --- a/ambari-agent/src/main/python/ambari_agent/main.py +++ b/ambari-agent/src/main/python/ambari_agent/main.py @@ -158,7 +158,7 @@ def stop_agent(): time.sleep(5) if os.path.exists(ProcessHelper.pidfile): raise Exception("PID file still exists.") - os._exit(0) + sys.exit(0) except Exception, err: if pid == -1: print ("Agent process is not running") @@ -166,7 +166,7 @@ def stop_agent(): res = runner.run([AMBARI_SUDO_BINARY, 'kill', '-9', str(pid)]) if res['exitCode'] != 0: raise Exception("Error while performing agent stop. " + res['error'] + res['output']) - os._exit(1) + sys.exit(1) def reset_agent(options): try: @@ -191,9 +191,9 @@ def reset_agent(options): os.rmdir(os.path.join(root, name)) except Exception, err: print("A problem occurred while trying to reset the agent: " + str(err)) - os._exit(1) + sys.exit(1) - os._exit(0) + sys.exit(0) # event - event, that will be passed to Controller and NetUtil to make able to interrupt loops form outside process # we need this for windows os, where no sigterm available http://git-wip-us.apache.org/repos/asf/ambari/blob/fcb18659/ambari-agent/src/test/python/ambari_agent/TestMain.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestMain.py b/ambari-agent/src/test/python/ambari_agent/TestMain.py index 0a3e878..23d532a 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestMain.py +++ b/ambari-agent/src/test/python/ambari_agent/TestMain.py @@ -25,8 +25,8 @@ import signal import os import socket import tempfile -import platform import ConfigParser + from ambari_commons import OSCheck from only_for_platform import only_for_platform, get_platform, PLATFORM_WINDOWS, PLATFORM_LINUX from mock.mock import MagicMock, patch, ANY, Mock @@ -62,16 +62,16 @@ class TestMain(unittest.TestCase): @only_for_platform(PLATFORM_LINUX) @patch("ambari_agent.HeartbeatHandlers.HeartbeatStopHandlersLinux") - @patch("os._exit") + @patch("sys.exit") @patch("os.getpid") @patch.object(ProcessHelper, "stopAgent") - def test_signal_handler(self, stopAgent_mock, os_getpid_mock, os_exit_mock, heartbeat_handler_mock): + def test_signal_handler(self, stopAgent_mock, os_getpid_mock, sys_exit_mock, heartbeat_handler_mock): # testing exit of children main.agentPid = 4444 os_getpid_mock.return_value = 5555 HeartbeatHandlers.signal_handler("signum", "frame") heartbeat_handler_mock.set_stop.assert_called() - os_exit_mock.reset_mock() + sys_exit_mock.reset_mock() # testing exit of main process os_getpid_mock.return_value = main.agentPid @@ -202,9 +202,9 @@ class TestMain(unittest.TestCase): @only_for_platform(PLATFORM_LINUX) @patch("time.sleep") @patch.object(shellRunner,"run") - @patch("os._exit") + @patch("sys.exit") @patch("os.path.exists") - def test_daemonize_and_stop(self, exists_mock, _exit_mock, kill_mock, sleep_mock): + def test_daemonize_and_stop(self, exists_mock, sys_exit_mock, kill_mock, sleep_mock): oldpid = ProcessHelper.pidfile pid = str(os.getpid()) _, tmpoutfile = tempfile.mkstemp() @@ -220,11 +220,11 @@ class TestMain(unittest.TestCase): exists_mock.return_value = False main.stop_agent() kill_mock.assert_called_with(['ambari-sudo.sh', 'kill', '-15', pid]) - _exit_mock.assert_called_with(0) + sys_exit_mock.assert_called_with(0) # Restore kill_mock.reset_mock() - _exit_mock.reset_mock() + sys_exit_mock.reset_mock() kill_mock.return_value = {'exitCode': 0, 'output': 'out', 'error': 'err'} # Testing exit when failed to remove pid file @@ -232,7 +232,7 @@ class TestMain(unittest.TestCase): main.stop_agent() kill_mock.assert_any_call(['ambari-sudo.sh', 'kill', '-15', pid]) kill_mock.assert_any_call(['ambari-sudo.sh', 'kill', '-9', pid]) - _exit_mock.assert_called_with(1) + sys_exit_mock.assert_called_with(1) # Restore ProcessHelper.pidfile = oldpid @@ -242,10 +242,10 @@ class TestMain(unittest.TestCase): @patch("os.path.join") @patch('__builtin__.open') @patch.object(ConfigParser, "ConfigParser") - @patch("os._exit") + @patch("sys.exit") @patch("os.walk") @patch("os.remove") - def test_reset(self, os_remove_mock, os_walk_mock, os_exit_mock, config_parser_mock, open_mock, os_path_join_mock, os_rmdir_mock): + def test_reset(self, os_remove_mock, os_walk_mock, sys_exit_mock, config_parser_mock, open_mock, os_path_join_mock, os_rmdir_mock): # Agent config update config_mock = MagicMock() os_walk_mock.return_value = [('/', ('',), ('file1.txt', 'file2.txt'))] @@ -256,14 +256,18 @@ class TestMain(unittest.TestCase): self.assertEqual(config_mock.set.call_count, 1) self.assertEqual(os_remove_mock.call_count, 2) + self.assertTrue(sys_exit_mock.called) + @patch("os.rmdir") @patch("os.path.join") @patch('__builtin__.open') @patch.object(ConfigParser, "ConfigParser") - @patch("os._exit") + @patch("sys.exit") @patch("os.walk") @patch("os.remove") - def test_reset_invalid_path(self, os_remove_mock, os_walk_mock, os_exit_mock, config_parser_mock, open_mock, os_path_join_mock, os_rmdir_mock): + def test_reset_invalid_path(self, os_remove_mock, os_walk_mock, sys_exit_mock, + config_parser_mock, open_mock, os_path_join_mock, os_rmdir_mock): + # Agent config file cannot be accessed config_mock = MagicMock() os_walk_mock.return_value = [('/', ('',), ('file1.txt', 'file2.txt'))] @@ -276,6 +280,8 @@ class TestMain(unittest.TestCase): except: self.assertTrue(True) + self.assertTrue(sys_exit_mock.called) + @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch.object(socket, "gethostbyname") http://git-wip-us.apache.org/repos/asf/ambari/blob/fcb18659/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/alerts/alert_hive_metastore.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/alerts/alert_hive_metastore.py b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/alerts/alert_hive_metastore.py index 120c4a0..81fe94c 100644 --- a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/alerts/alert_hive_metastore.py +++ b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/alerts/alert_hive_metastore.py @@ -20,12 +20,13 @@ limitations under the License. import socket import time + from resource_management.libraries.functions import format from resource_management.libraries.functions import get_kinit_path from resource_management.core.resources import Execute -OK_MESSAGE = "Metastore OK - %.4f response" -CRITICAL_MESSAGE = "Connection to metastore failed on host {0}" +OK_MESSAGE = "Metastore OK - Hive command took {0:.3f}s" +CRITICAL_MESSAGE = "Metastore on {0} failed ({1})" SECURITY_ENABLED_KEY = '{{cluster-env/security_enabled}}' SMOKEUSER_KEYTAB_KEY = '{{cluster-env/smokeuser_keytab}}' @@ -72,18 +73,20 @@ def execute(parameters=None, host_name=None): result_code = None - if security_enabled: - smokeuser_keytab = SMOKEUSER_KEYTAB_DEFAULT - if SMOKEUSER_KEYTAB_KEY in parameters: - smokeuser_keytab = parameters[SMOKEUSER_KEYTAB_KEY] - kinit_path_local = get_kinit_path() - kinitcmd=format("{kinit_path_local} -kt {smokeuser_keytab} {smokeuser}; ") - Execute(kinitcmd, - user=smokeuser, - path=["/bin/", "/usr/bin/", "/usr/lib/hive/bin/", "/usr/sbin/"], - ) - try: + if security_enabled: + smokeuser_keytab = SMOKEUSER_KEYTAB_DEFAULT + + if SMOKEUSER_KEYTAB_KEY in parameters: + smokeuser_keytab = parameters[SMOKEUSER_KEYTAB_KEY] + + kinit_path_local = get_kinit_path() + kinitcmd=format("{kinit_path_local} -kt {smokeuser_keytab} {smokeuser}; ") + + Execute(kinitcmd, user=smokeuser, + path=["/bin/", "/usr/bin/", "/usr/lib/hive/bin/", "/usr/sbin/"], + timeout=10) + if host_name is None: host_name = socket.getfqdn() @@ -92,24 +95,21 @@ def execute(parameters=None, host_name=None): metastore_uri = uri cmd = format("hive --hiveconf hive.metastore.uris={metastore_uri} -e 'show databases;'") + start_time = time.time() + try: - Execute(cmd, - user=smokeuser, - path=["/bin/", "/usr/bin/", "/usr/lib/hive/bin/", "/usr/sbin/"], - timeout=240 - ) - is_metastore_ok = True - except: - is_metastore_ok = False - - if is_metastore_ok == True: - result_code = 'OK' + Execute(cmd, user=smokeuser, + path=["/bin/", "/usr/bin/", "/usr/lib/hive/bin/", "/usr/sbin/"], + timeout=30 ) + total_time = time.time() - start_time - label = OK_MESSAGE % (total_time) - else: + + result_code = 'OK' + label = OK_MESSAGE.format(total_time) + except Exception, exception: result_code = 'CRITICAL' - label = CRITICAL_MESSAGE.format(host_name) + label = CRITICAL_MESSAGE.format(host_name, exception.message) except Exception, e: label = str(e) http://git-wip-us.apache.org/repos/asf/ambari/blob/fcb18659/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/alerts/alert_hive_thrift_port.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/alerts/alert_hive_thrift_port.py b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/alerts/alert_hive_thrift_port.py index c496717..0fb8898 100644 --- a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/alerts/alert_hive_thrift_port.py +++ b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/alerts/alert_hive_thrift_port.py @@ -110,8 +110,10 @@ def execute(parameters=None, host_name=None): start_time = time.time() try: - hive_check.check_thrift_port_sasl(host_name, port, hive_server2_authentication, - hive_server_principal, kinitcmd, smokeuser, transport_mode = transport_mode) + hive_check.check_thrift_port_sasl(host_name, port, + hive_server2_authentication, hive_server_principal, kinitcmd, smokeuser, + transport_mode = transport_mode) + is_thrift_port_ok = True except: is_thrift_port_ok = False
