Repository: incubator-slider
Updated Branches:
  refs/heads/develop bd62d359a -> 51a450999


SLIDER-438 Slider agent continues to run in the container on a node where NM 
dies


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/51a45099
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/51a45099
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/51a45099

Branch: refs/heads/develop
Commit: 51a450999ccd7e43bbfb31a578885972aa3a22c7
Parents: bd62d35
Author: Gour Saha <[email protected]>
Authored: Fri Oct 17 01:21:57 2014 -0700
Committer: Gour Saha <[email protected]>
Committed: Fri Oct 17 01:23:14 2014 -0700

----------------------------------------------------------------------
 .../src/main/python/agent/Controller.py         |   6 +-
 .../src/test/python/agent/TestController.py     | 129 ++++++++++++++++++-
 2 files changed, 130 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51a45099/slider-agent/src/main/python/agent/Controller.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/agent/Controller.py 
b/slider-agent/src/main/python/agent/Controller.py
index 5afab83..9788acd 100644
--- a/slider-agent/src/main/python/agent/Controller.py
+++ b/slider-agent/src/main/python/agent/Controller.py
@@ -226,8 +226,6 @@ class Controller(threading.Thread):
           successfully and if not, then take alternate measures (like kill
           processes). For now if stop is triggered it is considered stopped.
     '''
-    if not self.appGracefulStopQueued:
-      return False
     isAppStopped = False
     if self.appGracefulStopTriggered:
       isAppStopped = True
@@ -285,8 +283,8 @@ class Controller(threading.Thread):
             logger.error("Got restartAgent command")
             self.restartAgent()
         if 'terminateAgent' in response.keys():
-          terminateAgent = response['terminateAgent']
-          if terminateAgent:
+          self.terminateAgent = response['terminateAgent']
+          if self.terminateAgent:
             logger.error("Got terminateAgent command")
             self.stopApp()
             # Continue will add some wait time

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51a45099/slider-agent/src/test/python/agent/TestController.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/test/python/agent/TestController.py 
b/slider-agent/src/test/python/agent/TestController.py
index bda8033..4070783 100644
--- a/slider-agent/src/test/python/agent/TestController.py
+++ b/slider-agent/src/test/python/agent/TestController.py
@@ -516,11 +516,138 @@ class TestController(unittest.TestCase):
 
     sys.stdout = sys.__stdout__
     self.controller.sendRequest = Controller.Controller.sendRequest
-    self.controller.sendRequest = Controller.Controller.addToQueue
+    self.controller.addToQueue = Controller.Controller.addToQueue
 
     self.controller.config = original_value
     pass
 
+  @patch.object(threading._Event, "wait")
+  @patch("time.sleep")
+  @patch("json.loads")
+  @patch("json.dumps")
+  def test_heartbeatWithServerTerminateAgent(self, dumpsMock, loadsMock, 
sleepMock, event_mock):
+    original_value = self.controller.config
+    self.controller.config = AgentConfig("", "")
+    out = StringIO.StringIO()
+    sys.stdout = out
+
+    hearbeat = MagicMock()
+    self.controller.heartbeat = hearbeat
+
+    dumpsMock.return_value = "data"
+
+    sendRequest = MagicMock(name="sendRequest")
+    self.controller.sendRequest = sendRequest
+
+    self.controller.responseId = 1
+    response = {"responseId":"2", "restartAgent": False}
+    loadsMock.return_value = response
+
+    def one_heartbeat(*args, **kwargs):
+      self.controller.DEBUG_STOP_HEARTBEATING = True
+      return "data"
+
+    sendRequest.side_effect = one_heartbeat
+
+    actionQueue = MagicMock()
+    actionQueue.isIdle.return_value = True
+
+    # one successful request, after stop
+    self.controller.actionQueue = actionQueue
+    self.controller.heartbeatWithServer()
+    self.assertTrue(sendRequest.called)
+
+    calls = []
+    def retry(*args, **kwargs):
+      if len(calls) == 0:
+        calls.append(1)
+        response["responseId"] = "3"
+        raise Exception()
+      if len(calls) > 0:
+        self.controller.DEBUG_STOP_HEARTBEATING = True
+      return "data"
+
+    # exception, retry, successful and stop
+    sendRequest.side_effect = retry
+    self.controller.DEBUG_STOP_HEARTBEATING = False
+    self.controller.heartbeatWithServer()
+
+    self.assertEqual(1, self.controller.DEBUG_SUCCESSFULL_HEARTBEATS)
+
+    original_stopApp = self.controller.stopApp
+
+    # terminateAgent command - test 1
+    self.controller.responseId = 1
+    self.controller.DEBUG_STOP_HEARTBEATING = False
+    response = {"responseId":"2", "terminateAgent": True}
+    loadsMock.return_value = response
+    stopApp = MagicMock(name="stopApp")
+    self.controller.stopApp = stopApp
+    self.controller.heartbeatWithServer()
+    stopApp.assert_called_once_with()
+    
+    # reset for next test
+    self.controller.terminateAgent = False
+
+    # terminateAgent command - test 2
+    self.controller.responseId = 1
+    self.controller.DEBUG_STOP_HEARTBEATING = False
+    response = {"responseId":"2", "terminateAgent": True}
+    loadsMock.return_value = response
+    self.controller.stopApp = original_stopApp
+    stopCommand = {"roleCommand": "STOP"}
+    self.controller.stopCommand = stopCommand
+    addToQueue = MagicMock(name="addToQueue")
+    self.controller.addToQueue = addToQueue
+    self.controller.componentActualState = State.STARTED
+    self.controller.heartbeatWithServer()
+    self.assertTrue(self.controller.terminateAgent)
+    self.assertTrue(self.controller.appGracefulStopQueued)
+    addToQueue.assert_has_calls([call([stopCommand])])
+
+    # reset for next test
+    self.controller.terminateAgent = False
+    self.controller.appGracefulStopQueued = False
+
+    # terminateAgent command - test 3
+    self.controller.responseId = 1
+    self.controller.DEBUG_STOP_HEARTBEATING = False
+    # set stopCommand to None and let it get set by updateStateBasedOnCommand
+    self.controller.stopCommand = None
+    # in this heartbeat don't send terminateAgent signal
+    response = {"responseId":"2", "terminateAgent": False}
+    stopCommand = {"roleCommand": "STOP"}
+    response["executionCommands"] = [stopCommand]
+    loadsMock.return_value = response
+    # terminateAgent is False - make STOP in commands set the stopCommand
+    self.controller.heartbeatWithServer()
+    self.assertFalse(self.controller.terminateAgent)
+    assert not self.controller.stopCommand == None
+
+    # now no need to have STOP command in response, just send terminateAgent
+    self.controller.responseId = 2
+    self.controller.DEBUG_STOP_HEARTBEATING = False
+    response = {"responseId":"3", "terminateAgent": True}
+    loadsMock.return_value = response
+    addToQueue = MagicMock(name="addToQueue")
+    self.controller.addToQueue = addToQueue
+    self.controller.stopApp = original_stopApp
+    self.controller.componentActualState = State.STARTED
+    self.controller.heartbeatWithServer()
+    self.assertTrue(self.controller.terminateAgent)
+    self.assertTrue(self.controller.appGracefulStopQueued)
+    addToQueue.assert_has_calls([call([stopCommand])])
+    self.controller.terminateAgent = False
+
+    sleepMock.assert_called_with(
+      self.controller.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS)
+
+    sys.stdout = sys.__stdout__
+    self.controller.sendRequest = Controller.Controller.sendRequest
+    self.controller.addToQueue = Controller.Controller.addToQueue
+
+    self.controller.config = original_value
+    pass
 
   @patch.object(Controller.Controller, "createStatusCommand")
   def test_updateStateBasedOnResult(self, mock_createStatusCommand):

Reply via email to