Repository: incubator-slider Updated Branches: refs/heads/develop 9644fd341 -> 5a83421b2
SLIDER-1189 Agent never connects to new AM if AM restart takes too long Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/5a83421b Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/5a83421b Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/5a83421b Branch: refs/heads/develop Commit: 5a83421b2291298aef3cd4c99c880b5cb26d29ed Parents: 9644fd3 Author: Billie Rinaldi <bil...@apache.org> Authored: Wed Jan 25 11:00:08 2017 -0800 Committer: Billie Rinaldi <bil...@apache.org> Committed: Wed Jan 25 11:00:08 2017 -0800 ---------------------------------------------------------------------- .../src/main/python/agent/Controller.py | 40 +++++++++++++------- 1 file changed, 27 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5a83421b/slider-agent/src/main/python/agent/Controller.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/Controller.py b/slider-agent/src/main/python/agent/Controller.py index 8c01315..db0205f 100644 --- a/slider-agent/src/main/python/agent/Controller.py +++ b/slider-agent/src/main/python/agent/Controller.py @@ -46,7 +46,8 @@ import security logger = logging.getLogger() AGENT_AUTO_RESTART_EXIT_CODE = 77 -HEART_BEAT_RETRY_THRESHOLD = 2 +HEART_BEAT_RETRY_THRESHOLD = 3 +REGISTER_RETRY_THRESHOLD = 3 WS_AGENT_CONTEXT_ROOT = '/ws' SLIDER_PATH_AGENTS = WS_AGENT_CONTEXT_ROOT + '/v1/slider/agents/' @@ -88,6 +89,7 @@ class Controller(threading.Thread): self.statusCommand = None self.failureCount = 0 self.heartBeatRetryCount = 0 + self.registerRetryCount = 0 self.autoRestartFailures = 0 self.autoRestartTrackingSince = 0 self.terminateAgent = False @@ -120,6 +122,20 @@ class Controller(threading.Thread): pass pass + def readAMDataFromRegistry(self): + self.cachedconnect = None # Previous connection is broken now + zk_quorum = self.config.get(AgentConfig.SERVER_SECTION, Constants.ZK_QUORUM) + zk_reg_path = self.config.get(AgentConfig.SERVER_SECTION, Constants.ZK_REG_PATH) + registry = Registry(zk_quorum, zk_reg_path) + amHost, amUnsecuredPort, amSecuredPort = registry.readAMHostPort() + self.hostname = amHost + self.secured_port = amSecuredPort + self.config.set(AgentConfig.SERVER_SECTION, "hostname", self.hostname) + self.config.set(AgentConfig.SERVER_SECTION, "secured_port", self.secured_port) + self.server_url = 'https://' + self.hostname + ':' + self.secured_port + self.registerUrl = self.server_url + SLIDER_PATH_AGENTS + self.label + SLIDER_REL_PATH_REGISTER + self.heartbeatUrl = self.server_url + SLIDER_PATH_AGENTS + self.label + SLIDER_REL_PATH_HEARTBEAT + def registerWithServer(self): id = -1 ret = {} @@ -183,6 +199,15 @@ class Controller(threading.Thread): logger.info("Unable to connect to: " + self.registerUrl, exc_info=True) """ Sleeping for {0} seconds and then retrying again """.format(delay) time.sleep(delay) + self.registerRetryCount += 1 + logger.error( + "Register retry count = %d" % (self.registerRetryCount)) + # Re-read zk registry in case AM was restarted and came up with new + # host/port, but do this only after register retry attempts crosses + # threshold + if self.registerRetryCount > REGISTER_RETRY_THRESHOLD: + self.registerRetryCount = 0 + self.readAMDataFromRegistry() pass pass return regResp @@ -407,18 +432,7 @@ class Controller(threading.Thread): self.isRegistered = False self.repeatRegistration = True self.heartBeatRetryCount = 0 - self.cachedconnect = None # Previous connection is broken now - zk_quorum = self.config.get(AgentConfig.SERVER_SECTION, Constants.ZK_QUORUM) - zk_reg_path = self.config.get(AgentConfig.SERVER_SECTION, Constants.ZK_REG_PATH) - registry = Registry(zk_quorum, zk_reg_path) - amHost, amUnsecuredPort, amSecuredPort = registry.readAMHostPort() - self.hostname = amHost - self.secured_port = amSecuredPort - self.config.set(AgentConfig.SERVER_SECTION, "hostname", self.hostname) - self.config.set(AgentConfig.SERVER_SECTION, "secured_port", self.secured_port) - self.server_url = 'https://' + self.hostname + ':' + self.secured_port - self.registerUrl = self.server_url + SLIDER_PATH_AGENTS + self.label + SLIDER_REL_PATH_REGISTER - self.heartbeatUrl = self.server_url + SLIDER_PATH_AGENTS + self.label + SLIDER_REL_PATH_HEARTBEAT + self.readAMDataFromRegistry() return self.cachedconnect = None # Previous connection is broken now retry = True