Repository: incubator-slider Updated Branches: refs/heads/develop bd62d359a -> 51a450999
SLIDER-438 Slider agent continues to run in the container on a node where NM dies Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/51a45099 Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/51a45099 Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/51a45099 Branch: refs/heads/develop Commit: 51a450999ccd7e43bbfb31a578885972aa3a22c7 Parents: bd62d35 Author: Gour Saha <[email protected]> Authored: Fri Oct 17 01:21:57 2014 -0700 Committer: Gour Saha <[email protected]> Committed: Fri Oct 17 01:23:14 2014 -0700 ---------------------------------------------------------------------- .../src/main/python/agent/Controller.py | 6 +- .../src/test/python/agent/TestController.py | 129 ++++++++++++++++++- 2 files changed, 130 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51a45099/slider-agent/src/main/python/agent/Controller.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/Controller.py b/slider-agent/src/main/python/agent/Controller.py index 5afab83..9788acd 100644 --- a/slider-agent/src/main/python/agent/Controller.py +++ b/slider-agent/src/main/python/agent/Controller.py @@ -226,8 +226,6 @@ class Controller(threading.Thread): successfully and if not, then take alternate measures (like kill processes). For now if stop is triggered it is considered stopped. ''' - if not self.appGracefulStopQueued: - return False isAppStopped = False if self.appGracefulStopTriggered: isAppStopped = True @@ -285,8 +283,8 @@ class Controller(threading.Thread): logger.error("Got restartAgent command") self.restartAgent() if 'terminateAgent' in response.keys(): - terminateAgent = response['terminateAgent'] - if terminateAgent: + self.terminateAgent = response['terminateAgent'] + if self.terminateAgent: logger.error("Got terminateAgent command") self.stopApp() # Continue will add some wait time http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51a45099/slider-agent/src/test/python/agent/TestController.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/test/python/agent/TestController.py b/slider-agent/src/test/python/agent/TestController.py index bda8033..4070783 100644 --- a/slider-agent/src/test/python/agent/TestController.py +++ b/slider-agent/src/test/python/agent/TestController.py @@ -516,11 +516,138 @@ class TestController(unittest.TestCase): sys.stdout = sys.__stdout__ self.controller.sendRequest = Controller.Controller.sendRequest - self.controller.sendRequest = Controller.Controller.addToQueue + self.controller.addToQueue = Controller.Controller.addToQueue self.controller.config = original_value pass + @patch.object(threading._Event, "wait") + @patch("time.sleep") + @patch("json.loads") + @patch("json.dumps") + def test_heartbeatWithServerTerminateAgent(self, dumpsMock, loadsMock, sleepMock, event_mock): + original_value = self.controller.config + self.controller.config = AgentConfig("", "") + out = StringIO.StringIO() + sys.stdout = out + + hearbeat = MagicMock() + self.controller.heartbeat = hearbeat + + dumpsMock.return_value = "data" + + sendRequest = MagicMock(name="sendRequest") + self.controller.sendRequest = sendRequest + + self.controller.responseId = 1 + response = {"responseId":"2", "restartAgent": False} + loadsMock.return_value = response + + def one_heartbeat(*args, **kwargs): + self.controller.DEBUG_STOP_HEARTBEATING = True + return "data" + + sendRequest.side_effect = one_heartbeat + + actionQueue = MagicMock() + actionQueue.isIdle.return_value = True + + # one successful request, after stop + self.controller.actionQueue = actionQueue + self.controller.heartbeatWithServer() + self.assertTrue(sendRequest.called) + + calls = [] + def retry(*args, **kwargs): + if len(calls) == 0: + calls.append(1) + response["responseId"] = "3" + raise Exception() + if len(calls) > 0: + self.controller.DEBUG_STOP_HEARTBEATING = True + return "data" + + # exception, retry, successful and stop + sendRequest.side_effect = retry + self.controller.DEBUG_STOP_HEARTBEATING = False + self.controller.heartbeatWithServer() + + self.assertEqual(1, self.controller.DEBUG_SUCCESSFULL_HEARTBEATS) + + original_stopApp = self.controller.stopApp + + # terminateAgent command - test 1 + self.controller.responseId = 1 + self.controller.DEBUG_STOP_HEARTBEATING = False + response = {"responseId":"2", "terminateAgent": True} + loadsMock.return_value = response + stopApp = MagicMock(name="stopApp") + self.controller.stopApp = stopApp + self.controller.heartbeatWithServer() + stopApp.assert_called_once_with() + + # reset for next test + self.controller.terminateAgent = False + + # terminateAgent command - test 2 + self.controller.responseId = 1 + self.controller.DEBUG_STOP_HEARTBEATING = False + response = {"responseId":"2", "terminateAgent": True} + loadsMock.return_value = response + self.controller.stopApp = original_stopApp + stopCommand = {"roleCommand": "STOP"} + self.controller.stopCommand = stopCommand + addToQueue = MagicMock(name="addToQueue") + self.controller.addToQueue = addToQueue + self.controller.componentActualState = State.STARTED + self.controller.heartbeatWithServer() + self.assertTrue(self.controller.terminateAgent) + self.assertTrue(self.controller.appGracefulStopQueued) + addToQueue.assert_has_calls([call([stopCommand])]) + + # reset for next test + self.controller.terminateAgent = False + self.controller.appGracefulStopQueued = False + + # terminateAgent command - test 3 + self.controller.responseId = 1 + self.controller.DEBUG_STOP_HEARTBEATING = False + # set stopCommand to None and let it get set by updateStateBasedOnCommand + self.controller.stopCommand = None + # in this heartbeat don't send terminateAgent signal + response = {"responseId":"2", "terminateAgent": False} + stopCommand = {"roleCommand": "STOP"} + response["executionCommands"] = [stopCommand] + loadsMock.return_value = response + # terminateAgent is False - make STOP in commands set the stopCommand + self.controller.heartbeatWithServer() + self.assertFalse(self.controller.terminateAgent) + assert not self.controller.stopCommand == None + + # now no need to have STOP command in response, just send terminateAgent + self.controller.responseId = 2 + self.controller.DEBUG_STOP_HEARTBEATING = False + response = {"responseId":"3", "terminateAgent": True} + loadsMock.return_value = response + addToQueue = MagicMock(name="addToQueue") + self.controller.addToQueue = addToQueue + self.controller.stopApp = original_stopApp + self.controller.componentActualState = State.STARTED + self.controller.heartbeatWithServer() + self.assertTrue(self.controller.terminateAgent) + self.assertTrue(self.controller.appGracefulStopQueued) + addToQueue.assert_has_calls([call([stopCommand])]) + self.controller.terminateAgent = False + + sleepMock.assert_called_with( + self.controller.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS) + + sys.stdout = sys.__stdout__ + self.controller.sendRequest = Controller.Controller.sendRequest + self.controller.addToQueue = Controller.Controller.addToQueue + + self.controller.config = original_value + pass @patch.object(Controller.Controller, "createStatusCommand") def test_updateStateBasedOnResult(self, mock_createStatusCommand):
