Repository: incubator-slider
Updated Branches:
  refs/heads/develop 9644fd341 -> 5a83421b2


SLIDER-1189 Agent never connects to new AM if AM restart takes too long


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/5a83421b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/5a83421b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/5a83421b

Branch: refs/heads/develop
Commit: 5a83421b2291298aef3cd4c99c880b5cb26d29ed
Parents: 9644fd3
Author: Billie Rinaldi <bil...@apache.org>
Authored: Wed Jan 25 11:00:08 2017 -0800
Committer: Billie Rinaldi <bil...@apache.org>
Committed: Wed Jan 25 11:00:08 2017 -0800

----------------------------------------------------------------------
 .../src/main/python/agent/Controller.py         | 40 +++++++++++++-------
 1 file changed, 27 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5a83421b/slider-agent/src/main/python/agent/Controller.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/agent/Controller.py 
b/slider-agent/src/main/python/agent/Controller.py
index 8c01315..db0205f 100644
--- a/slider-agent/src/main/python/agent/Controller.py
+++ b/slider-agent/src/main/python/agent/Controller.py
@@ -46,7 +46,8 @@ import security
 logger = logging.getLogger()
 
 AGENT_AUTO_RESTART_EXIT_CODE = 77
-HEART_BEAT_RETRY_THRESHOLD = 2
+HEART_BEAT_RETRY_THRESHOLD = 3
+REGISTER_RETRY_THRESHOLD = 3
 
 WS_AGENT_CONTEXT_ROOT = '/ws'
 SLIDER_PATH_AGENTS = WS_AGENT_CONTEXT_ROOT + '/v1/slider/agents/'
@@ -88,6 +89,7 @@ class Controller(threading.Thread):
     self.statusCommand = None
     self.failureCount = 0
     self.heartBeatRetryCount = 0
+    self.registerRetryCount = 0
     self.autoRestartFailures = 0
     self.autoRestartTrackingSince = 0
     self.terminateAgent = False
@@ -120,6 +122,20 @@ class Controller(threading.Thread):
       pass
     pass
 
+  def readAMDataFromRegistry(self):
+    self.cachedconnect = None # Previous connection is broken now
+    zk_quorum = self.config.get(AgentConfig.SERVER_SECTION, 
Constants.ZK_QUORUM)
+    zk_reg_path = self.config.get(AgentConfig.SERVER_SECTION, 
Constants.ZK_REG_PATH)
+    registry = Registry(zk_quorum, zk_reg_path)
+    amHost, amUnsecuredPort, amSecuredPort = registry.readAMHostPort()
+    self.hostname = amHost
+    self.secured_port = amSecuredPort
+    self.config.set(AgentConfig.SERVER_SECTION, "hostname", self.hostname)
+    self.config.set(AgentConfig.SERVER_SECTION, "secured_port", 
self.secured_port)
+    self.server_url = 'https://' + self.hostname + ':' + self.secured_port
+    self.registerUrl = self.server_url + SLIDER_PATH_AGENTS + self.label + 
SLIDER_REL_PATH_REGISTER
+    self.heartbeatUrl = self.server_url + SLIDER_PATH_AGENTS + self.label + 
SLIDER_REL_PATH_HEARTBEAT
+
   def registerWithServer(self):
     id = -1
     ret = {}
@@ -183,6 +199,15 @@ class Controller(threading.Thread):
         logger.info("Unable to connect to: " + self.registerUrl, exc_info=True)
         """ Sleeping for {0} seconds and then retrying again """.format(delay)
         time.sleep(delay)
+        self.registerRetryCount += 1
+        logger.error(
+          "Register retry count = %d" % (self.registerRetryCount))
+        # Re-read zk registry in case AM was restarted and came up with new
+        # host/port, but do this only after register retry attempts crosses
+        # threshold
+        if self.registerRetryCount > REGISTER_RETRY_THRESHOLD:
+          self.registerRetryCount = 0
+          self.readAMDataFromRegistry()
         pass
       pass
     return regResp
@@ -407,18 +432,7 @@ class Controller(threading.Thread):
           self.isRegistered = False
           self.repeatRegistration = True
           self.heartBeatRetryCount = 0
-          self.cachedconnect = None # Previous connection is broken now
-          zk_quorum = self.config.get(AgentConfig.SERVER_SECTION, 
Constants.ZK_QUORUM)
-          zk_reg_path = self.config.get(AgentConfig.SERVER_SECTION, 
Constants.ZK_REG_PATH)
-          registry = Registry(zk_quorum, zk_reg_path)
-          amHost, amUnsecuredPort, amSecuredPort = registry.readAMHostPort()
-          self.hostname = amHost
-          self.secured_port = amSecuredPort
-          self.config.set(AgentConfig.SERVER_SECTION, "hostname", 
self.hostname)
-          self.config.set(AgentConfig.SERVER_SECTION, "secured_port", 
self.secured_port)
-          self.server_url = 'https://' + self.hostname + ':' + 
self.secured_port
-          self.registerUrl = self.server_url + SLIDER_PATH_AGENTS + self.label 
+ SLIDER_REL_PATH_REGISTER
-          self.heartbeatUrl = self.server_url + SLIDER_PATH_AGENTS + 
self.label + SLIDER_REL_PATH_HEARTBEAT
+          self.readAMDataFromRegistry()
           return
         self.cachedconnect = None # Previous connection is broken now
         retry = True

Reply via email to