AMBARI-6978. Uncatched exception at ambari agent - it may die on connection error (dlysnichenko)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/8f02714e Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/8f02714e Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/8f02714e Branch: refs/heads/branch-alerts-dev Commit: 8f02714e5bff2c943caa309f020d7060e509735e Parents: 0822a54 Author: Lisnichenko Dmitro <[email protected]> Authored: Thu Aug 21 16:04:55 2014 +0300 Committer: Lisnichenko Dmitro <[email protected]> Committed: Thu Aug 21 20:14:31 2014 +0300 ---------------------------------------------------------------------- .../src/main/python/ambari_agent/Controller.py | 55 +++++++------- .../test/python/ambari_agent/TestController.py | 76 ++++++++++++++++---- 2 files changed, 90 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/8f02714e/ambari-agent/src/main/python/ambari_agent/Controller.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py index 87af939..7859a2d 100644 --- a/ambari-agent/src/main/python/ambari_agent/Controller.py +++ b/ambari-agent/src/main/python/ambari_agent/Controller.py @@ -78,17 +78,20 @@ class Controller(threading.Thread): def __del__(self): logger.info("Server connection disconnected.") pass - + def registerWithServer(self): + """ + :return: returning from current method without setting self.isRegistered + to True will lead to agent termination. + """ LiveStatus.SERVICES = [] LiveStatus.CLIENT_COMPONENTS = [] LiveStatus.COMPONENTS = [] - id = -1 ret = {} while not self.isRegistered: try: - data = json.dumps(self.register.build(id)) + data = json.dumps(self.register.build()) prettyData = pprint.pformat(data) try: @@ -111,8 +114,7 @@ class Controller(threading.Thread): # log - message, which will be printed to agents log if 'log' in ret.keys(): log = ret['log'] - - logger.error(log) + logger.error(log) self.isRegistered = False self.repeatRegistration = False return ret @@ -122,23 +124,22 @@ class Controller(threading.Thread): self.responseId = int(ret['responseId']) self.isRegistered = True if 'statusCommands' in ret.keys(): - logger.info("Got status commands on registration " + pprint.pformat(ret['statusCommands']) ) + logger.info("Got status commands on registration " + pprint.pformat(ret['statusCommands'])) self.addToStatusQueue(ret['statusCommands']) pass else: self.hasMappedComponents = False pass except ssl.SSLError: - self.repeatRegistration=False + self.repeatRegistration = False self.isRegistered = False return except Exception: # try a reconnect only after a certain amount of random time delay = randint(0, self.range) - logger.error("Unable to connect to: " + self.registerUrl, exc_info = True) + logger.error("Unable to connect to: " + self.registerUrl, exc_info=True) """ Sleeping for {0} seconds and then retrying again """.format(delay) time.sleep(delay) - pass pass return ret @@ -147,7 +148,7 @@ class Controller(threading.Thread): if commands: self.actionQueue.cancel(commands) pass - + def addToQueue(self, commands): """Add to the queue for running the commands """ """ Put the required actions into the Queue """ @@ -178,11 +179,8 @@ class Controller(threading.Thread): self.DEBUG_SUCCESSFULL_HEARTBEATS = 0 retry = False certVerifFailed = False - hb_interval = self.config.get('heartbeat', 'state_interval') - #TODO make sure the response id is monotonically increasing - id = 0 while not self.DEBUG_STOP_HEARTBEATING: try: if not retry: @@ -212,7 +210,7 @@ class Controller(threading.Thread): logger.info('Heartbeat response received (id = %s)', serverId) if 'hasMappedComponents' in response.keys(): - self.hasMappedComponents = response['hasMappedComponents'] != False + self.hasMappedComponents = response['hasMappedComponents'] is not False if 'registrationCommand' in response.keys(): # check if the registration command is None. If none skip @@ -226,7 +224,7 @@ class Controller(threading.Thread): logger.error("Error in responseId sequence - restarting") self.restartAgent() else: - self.responseId=serverId + self.responseId = serverId if 'cancelCommands' in response.keys(): self.cancelCommandInQueue(response['cancelCommands']) @@ -250,7 +248,7 @@ class Controller(threading.Thread): if retry: logger.info("Reconnected to %s", self.heartbeatUrl) - retry=False + retry = False certVerifFailed = False self.DEBUG_SUCCESSFULL_HEARTBEATS += 1 self.DEBUG_HEARTBEAT_RETRIES = 0 @@ -260,10 +258,6 @@ class Controller(threading.Thread): self.isRegistered = False return except Exception, err: - #randomize the heartbeat - delay = randint(0, self.range) - time.sleep(delay) - if "code" in err: logger.error(err.code) else: @@ -283,13 +277,17 @@ class Controller(threading.Thread): logger.warn("Server certificate verify failed. Did you regenerate server certificate?") certVerifFailed = True - self.cachedconnect = None # Previous connection is broken now - retry=True + self.cachedconnect = None # Previous connection is broken now + retry = True + + #randomize the heartbeat + delay = randint(0, self.range) + time.sleep(delay) # Sleep for some time timeout = self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC \ - self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS - self.heartbeat_wait_event.wait(timeout = timeout) + self.heartbeat_wait_event.wait(timeout=timeout) # Sleep a bit more to allow STATUS_COMMAND results to be collected # and sent in one heartbeat. Also avoid server overload with heartbeats time.sleep(self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS) @@ -345,17 +343,16 @@ class Controller(threading.Thread): return json.loads(response) except Exception, exception: if response is None: - err_msg = 'Request to {0} failed due to {1}'.format(url, str(exception)) - return {'exitstatus': 1, 'log': err_msg} + raise IOError('Request to {0} failed due to {1}'.format(url, str(exception))) else: - err_msg = ('Response parsing failed! Request data: ' + str(data) - + '; Response: ' + str(response)) - logger.warn(err_msg) - return {'exitstatus': 1, 'log': err_msg} + raise IOError('Response parsing failed! Request data: ' + str(data) + + '; Response: ' + str(response)) + def updateComponents(self, cluster_name): logger.info("Updating components map of cluster " + cluster_name) + # May throw IOError on server connection error response = self.sendRequest(self.componentsUrl + cluster_name, None) logger.debug("Response from %s was %s", self.serverHostname, str(response)) http://git-wip-us.apache.org/repos/asf/ambari/blob/8f02714e/ambari-agent/src/test/python/ambari_agent/TestController.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestController.py b/ambari-agent/src/test/python/ambari_agent/TestController.py index 9ec23db..ad8303f 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestController.py +++ b/ambari-agent/src/test/python/ambari_agent/TestController.py @@ -30,7 +30,7 @@ from threading import Event import json with patch("platform.linux_distribution", return_value = ('Suse','11','Final')): - from ambari_agent import Controller, ActionQueue + from ambari_agent import Controller, ActionQueue, Register from ambari_agent import hostname from ambari_agent.Controller import AGENT_AUTO_RESTART_EXIT_CODE from ambari_commons import OSCheck @@ -247,9 +247,9 @@ class TestController(unittest.TestCase): heartbeatWithServer.assert_called_once_with() self.controller.registerWithServer =\ - Controller.Controller.registerWithServer + Controller.Controller.registerWithServer self.controller.heartbeatWithServer =\ - Controller.Controller.registerWithServer + Controller.Controller.registerWithServer @patch("time.sleep") def test_registerAndHeartbeat(self, sleepMock): @@ -300,6 +300,33 @@ class TestController(unittest.TestCase): Controller.Controller.registerWithServer + @patch("time.sleep") + @patch.object(Controller.Controller, "sendRequest") + def test_registerWithIOErrors(self, sendRequestMock, sleepMock): + # Check that server continues to heartbeat after connection errors + registerMock = MagicMock(name="Register") + registerMock.build.return_value = {} + actionQueue = MagicMock() + actionQueue.isIdle.return_value = True + self.controller.actionQueue = actionQueue + self.controller.register = registerMock + self.controller.responseId = 1 + self.controller.TEST_IOERROR_COUNTER = 1 + self.controller.isRegistered = False + def util_throw_IOErrors(*args, **kwargs): + """ + Throws IOErrors 10 times and then stops heartbeats/registrations + """ + if self.controller.TEST_IOERROR_COUNTER == 10: + self.controller.isRegistered = True + self.controller.TEST_IOERROR_COUNTER += 1 + raise IOError("Sample error") + actionQueue.isIdle.return_value = False + sendRequestMock.side_effect = util_throw_IOErrors + self.controller.registerWithServer() + self.assertTrue(sendRequestMock.call_count > 5) + + @patch("os._exit") def test_restartAgent(self, os_exit_mock): @@ -331,18 +358,22 @@ class TestController(unittest.TestCase): {'Content-Type': 'application/json'}) conMock.request.return_value = '{invalid_object}' - actual = self.controller.sendRequest(url, data) - expected = {'exitstatus': 1, 'log': ('Response parsing failed! Request data: ' + data - + '; Response: {invalid_object}')} - self.assertEqual(actual, expected) + + try: + self.controller.sendRequest(url, data) + self.fail("Should throw exception!") + except IOError, e: # Expected + self.assertEquals('Response parsing failed! Request data: ' + data + + '; Response: {invalid_object}', e.message) exceptionMessage = "Connection Refused" conMock.request.side_effect = Exception(exceptionMessage) - actual = self.controller.sendRequest(url, data) - expected = {'exitstatus': 1, 'log': 'Request to ' + url + ' failed due to ' + exceptionMessage} - - self.assertEqual(actual, expected) - + try: + self.controller.sendRequest(url, data) + self.fail("Should throw exception!") + except IOError, e: # Expected + self.assertEquals('Request to ' + url + ' failed due to ' + + exceptionMessage, e.message) @patch.object(threading._Event, "wait") @@ -483,6 +514,27 @@ class TestController(unittest.TestCase): sleepMock.assert_called_with( self.controller.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS) + # Check that server continues to heartbeat after connection errors + self.controller.responseId = 1 + self.controller.TEST_IOERROR_COUNTER = 1 + sendRequest.reset() + def util_throw_IOErrors(*args, **kwargs): + """ + Throws IOErrors 100 times and then stops heartbeats/registrations + """ + if self.controller.TEST_IOERROR_COUNTER == 10: + self.controller.DEBUG_STOP_HEARTBEATING = True + self.controller.TEST_IOERROR_COUNTER += 1 + raise IOError("Sample error") + self.controller.DEBUG_STOP_HEARTBEATING = False + actionQueue.isIdle.return_value = False + sendRequest.side_effect = util_throw_IOErrors + self.controller.heartbeatWithServer() + self.assertTrue(sendRequest.call_count > 5) + + sleepMock.assert_called_with( + self.controller.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS) + sys.stdout = sys.__stdout__ self.controller.sendRequest = Controller.Controller.sendRequest self.controller.sendRequest = Controller.Controller.addToQueue
