Repository: ambari Updated Branches: refs/heads/trunk 9c25b6ce6 -> dfedb0313
AMBARI-6536. Add more info about the Heartbeat message from Ambari Agent to Server in its logs (Jonathan Hurley via ncole) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/dfedb031 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/dfedb031 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/dfedb031 Branch: refs/heads/trunk Commit: dfedb0313552530827eba7899cb5250ae5c9d7d1 Parents: 9c25b6c Author: Nate Cole <nc...@hortonworks.com> Authored: Mon Jul 7 09:51:12 2014 -0400 Committer: Nate Cole <nc...@hortonworks.com> Committed: Mon Jul 7 09:51:12 2014 -0400 ---------------------------------------------------------------------- .../src/main/python/ambari_agent/Controller.py | 117 +++++++++++++------ .../src/main/python/ambari_agent/Heartbeat.py | 18 +-- .../src/main/python/ambari_agent/LiveStatus.py | 4 +- .../src/main/python/ambari_agent/NetUtil.py | 17 ++- .../src/main/python/ambari_agent/main.py | 15 ++- .../test/python/ambari_agent/TestController.py | 7 +- .../src/test/python/ambari_agent/TestMain.py | 12 +- 7 files changed, 124 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/dfedb031/ambari-agent/src/main/python/ambari_agent/Controller.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py index 9839313..bf68616 100644 --- a/ambari-agent/src/main/python/ambari_agent/Controller.py +++ b/ambari-agent/src/main/python/ambari_agent/Controller.py @@ -23,6 +23,7 @@ import signal import json import sys import os +import socket import time import threading import urllib2 @@ -54,7 +55,8 @@ class Controller(threading.Thread): self.credential = None self.config = config self.hostname = hostname.hostname() - server_secured_url = 'https://' + config.get('server', 'hostname') + \ + self.serverHostname = config.get('server', 'hostname') + server_secured_url = 'https://' + self.serverHostname + \ ':' + config.get('server', 'secured_url_port') self.registerUrl = server_secured_url + '/agent/v1/register/' + self.hostname self.heartbeatUrl = server_secured_url + '/agent/v1/heartbeat/' + self.hostname @@ -85,28 +87,39 @@ class Controller(threading.Thread): ret = {} while not self.isRegistered: - try: + try: data = json.dumps(self.register.build(id)) - logger.info("Registering with the server " + pprint.pformat(data)) + prettyData = pprint.pformat(data) + + try: + server_ip = socket.gethostbyname(self.hostname) + logger.info("Registering with %s (%s) (agent=%s)", self.hostname, server_ip, prettyData) + except socket.error: + logger.warn("Unable to determine the IP address of '%s', agent registration may fail (agent=%s)", + self.hostname, prettyData) + ret = self.sendRequest(self.registerUrl, data) - exitstatus = 0 + # exitstatus is a code of error which was rised on server side. # exitstatus = 0 (OK - Default) - # exitstatus = 1 (Registration failed because - # different version of agent and server) + # exitstatus = 1 (Registration failed because different version of agent and server) + exitstatus = 0 if 'exitstatus' in ret.keys(): exitstatus = int(ret['exitstatus']) - # log - message, which will be printed to agents log - if 'log' in ret.keys(): - log = ret['log'] + if exitstatus == 1: + # log - message, which will be printed to agents log + if 'log' in ret.keys(): + log = ret['log'] + logger.error(log) self.isRegistered = False self.repeatRegistration=False return ret - logger.info("Registered with the server with " + pprint.pformat(ret)) - print("Registered with the server") - self.responseId= int(ret['responseId']) + + logger.info("Registration Successful (response=%s)", pprint.pformat(ret)) + + self.responseId = int(ret['responseId']) self.isRegistered = True if 'statusCommands' in ret.keys(): logger.info("Got status commands on registration " + pprint.pformat(ret['statusCommands']) ) @@ -119,10 +132,10 @@ class Controller(threading.Thread): self.repeatRegistration=False self.isRegistered = False return - except Exception, err: + except Exception: # try a reconnect only after a certain amount of random time delay = randint(0, self.range) - logger.info("Unable to connect to: " + self.registerUrl, exc_info = True) + logger.error("Unable to connect to: " + self.registerUrl, exc_info = True) """ Sleeping for {0} seconds and then retrying again """.format(delay) time.sleep(delay) pass @@ -135,7 +148,7 @@ class Controller(threading.Thread): """ Put the required actions into the Queue """ """ Verify if the action is to reboot or not """ if not commands: - logger.debug("No commands from the server : " + pprint.pformat(commands)) + logger.debug("No commands received from %s", self.serverHostname) else: """Only add to the queue if not empty list """ self.actionQueue.put(commands) @@ -143,7 +156,7 @@ class Controller(threading.Thread): def addToStatusQueue(self, commands): if not commands: - logger.debug("No status commands from the server : " + pprint.pformat(commands)) + logger.debug("No status commands received from %s", self.serverHostname) else: if not LiveStatus.SERVICES: self.updateComponents(commands[0]['clusterName']) @@ -171,15 +184,28 @@ class Controller(threading.Thread): if not retry: data = json.dumps( self.heartbeat.build(self.responseId, int(hb_interval), self.hasMappedComponents)) - logger.debug("Sending request: " + data) pass else: self.DEBUG_HEARTBEAT_RETRIES += 1 - response = self.sendRequest(self.heartbeatUrl, data) - logger.debug('Got server response: ' + pprint.pformat(response)) + if logger.isEnabledFor(logging.DEBUG): + logger.debug("Sending Heartbeat (id = %s): %s", self.responseId, data) + + response = self.sendRequest(self.heartbeatUrl, data) - serverId=int(response['responseId']) + exitStatus = 0 + if 'exitstatus' in response.keys(): + exitStatus = int(response['exitstatus']) + + if exitStatus != 0: + raise Exception(response) + + serverId = int(response['responseId']) + + if logger.isEnabledFor(logging.DEBUG): + logger.debug('Heartbeat response (id = %s): %s', serverId, pprint.pformat(response)) + else: + logger.info('Heartbeat response received (id = %s)', serverId) if 'hasMappedComponents' in response.keys(): self.hasMappedComponents = response['hasMappedComponents'] != False @@ -192,7 +218,7 @@ class Controller(threading.Thread): self.repeatRegistration = True return - if serverId!=self.responseId+1: + if serverId != self.responseId + 1: logger.error("Error in responseId sequence - restarting") self.restartAgent() else: @@ -201,19 +227,21 @@ class Controller(threading.Thread): if 'executionCommands' in response.keys(): self.addToQueue(response['executionCommands']) pass + if 'statusCommands' in response.keys(): self.addToStatusQueue(response['statusCommands']) pass + if "true" == response['restartAgent']: - logger.error("Got restartAgent command") + logger.error("Received the restartAgent command") self.restartAgent() else: - logger.info("No commands sent from the Server.") + logger.info("No commands sent from %s", self.serverHostname) pass if retry: - print("Reconnected to the server") - logger.info("Reconnected to the server") + logger.info("Reconnected to %s", self.heartbeatUrl) + retry=False certVerifFailed = False self.DEBUG_SUCCESSFULL_HEARTBEATS += 1 @@ -227,18 +255,29 @@ class Controller(threading.Thread): #randomize the heartbeat delay = randint(0, self.range) time.sleep(delay) + if "code" in err: logger.error(err.code) else: - logger.error("Unable to connect to: " + self.heartbeatUrl + " due to " + str(err)) - logger.debug("Details: " + str(err), exc_info=True) + logException = False + if logger.isEnabledFor(logging.DEBUG): + logException = True + + exceptionMessage = str(err) + errorMessage = "Unable to reconnect to {0} (attempts={1}, details={2})".format(self.heartbeatUrl, self.DEBUG_HEARTBEAT_RETRIES, exceptionMessage) + if not retry: - print("Connection to the server was lost. Reconnecting...") + errorMessage = "Connection to {0} was lost (details={1})".format(self.serverHostname, exceptionMessage) + + logger.error(errorMessage, exc_info=logException) + if 'certificate verify failed' in str(err) and not certVerifFailed: - print("Server certificate verify failed. Did you regenerate server certificate?") + logger.warn("Server certificate verify failed. Did you regenerate server certificate?") certVerifFailed = True + self.cachedconnect = None # Previous connection is broken now retry=True + # Sleep for some time timeout = self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC \ - self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS @@ -268,11 +307,13 @@ class Controller(threading.Thread): def registerAndHeartbeat(self): registerResponse = self.registerWithServer() message = registerResponse['response'] - logger.info("Response from server = " + message) + logger.info("Registration response from %s was %s", self.serverHostname, message) + if self.isRegistered: # Process callbacks for callback in self.registration_listeners: callback() + time.sleep(self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC) self.heartbeatWithServer() @@ -281,17 +322,17 @@ class Controller(threading.Thread): pass def sendRequest(self, url, data): + response = None + try: if self.cachedconnect is None: # Lazy initialization self.cachedconnect = security.CachedHTTPSConnection(self.config) - req = urllib2.Request(url, data, {'Content-Type': 'application/json'}) - response = None + req = urllib2.Request(url, data, {'Content-Type': 'application/json'}) response = self.cachedconnect.request(req) return json.loads(response) - except Exception: + except Exception, exception: if response is None: - err_msg = 'Request failed! Data: ' + str(data) - logger.warn(err_msg) + err_msg = 'Request to {0} failed due to {1}'.format(url, str(exception)) return {'exitstatus': 1, 'log': err_msg} else: err_msg = ('Response parsing failed! Request data: ' + str(data) @@ -301,8 +342,10 @@ class Controller(threading.Thread): def updateComponents(self, cluster_name): logger.info("Updating components map of cluster " + cluster_name) - response = self.sendRequest(self.componentsUrl + cluster_name, None) - logger.debug("Response from server = " + str(response)) + + response = self.sendRequest(self.componentsUrl + cluster_name, None) + logger.debug("Response from %s was %s", self.serverHostname, str(response)) + for service, components in response['components'].items(): LiveStatus.SERVICES.append(service) for component, category in components.items(): http://git-wip-us.apache.org/repos/asf/ambari/blob/dfedb031/ambari-agent/src/main/python/ambari_agent/Heartbeat.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Heartbeat.py b/ambari-agent/src/main/python/ambari_agent/Heartbeat.py index 6c7543e..65a7a33 100644 --- a/ambari-agent/src/main/python/ambari_agent/Heartbeat.py +++ b/ambari-agent/src/main/python/ambari_agent/Heartbeat.py @@ -58,6 +58,7 @@ class Heartbeat: commandsInProgress = False if not self.actionQueue.commandQueue.empty(): commandsInProgress = True + if len(queueResult) != 0: heartbeat['reports'] = queueResult['reports'] heartbeat['componentStatus'] = queueResult['componentStatus'] @@ -70,23 +71,26 @@ class Heartbeat: if int(id) == 0: componentsMapped = False - logger.info("Sending heartbeat with response id: " + str(id) + " and " - "timestamp: " + str(timestamp) + - ". Command(s) in progress: " + repr(commandsInProgress) + - ". Components mapped: " + repr(componentsMapped)) - logger.debug("Heartbeat : " + pformat(heartbeat)) + logger.info("Building Heartbeat: {responseId = %s, timestamp = %s, commandsInProgress = %s, componentsMapped = %s}", + str(id), str(timestamp), repr(commandsInProgress), repr(componentsMapped)) + + if logger.isEnabledFor(logging.DEBUG): + logger.debug("Heartbeat: %s", pformat(heartbeat)) if (int(id) >= 0) and state_interval > 0 and (int(id) % state_interval) == 0: hostInfo = HostInfo(self.config) nodeInfo = { } + # for now, just do the same work as registration # this must be the last step before returning heartbeat hostInfo.register(nodeInfo, componentsMapped, commandsInProgress) heartbeat['agentEnv'] = nodeInfo - logger.debug("agentEnv : " + str(nodeInfo)) mounts = Hardware.osdisks() heartbeat['mounts'] = mounts - logger.debug("mounts : " + str(mounts)) + + if logger.isEnabledFor(logging.DEBUG): + logger.debug("agentEnv: %s", str(nodeInfo)) + logger.debug("mounts: %s", str(mounts)) return heartbeat http://git-wip-us.apache.org/repos/asf/ambari/blob/dfedb031/ambari-agent/src/main/python/ambari_agent/LiveStatus.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/LiveStatus.py b/ambari-agent/src/main/python/ambari_agent/LiveStatus.py index 49cea62..49189c8 100644 --- a/ambari-agent/src/main/python/ambari_agent/LiveStatus.py +++ b/ambari-agent/src/main/python/ambari_agent/LiveStatus.py @@ -58,7 +58,6 @@ class LiveStatus: """ global SERVICES, CLIENT_COMPONENTS, COMPONENTS, LIVE_STATUS, DEAD_STATUS - livestatus = None component = {"serviceName" : self.service, "componentName" : self.component} if forsed_component_status: # If already determined status = forsed_component_status # Nothing to do @@ -73,7 +72,7 @@ class LiveStatus: logger.warn("There is no service to pid mapping for " + self.component) status = self.LIVE_STATUS if serviceStatus else self.DEAD_STATUS - livestatus ={"componentName" : self.component, + livestatus = {"componentName" : self.component, "msg" : "", "status" : status, "clusterName" : self.cluster, @@ -81,6 +80,7 @@ class LiveStatus: "stackVersion": self.versionsHandler. read_stack_version(self.component) } + active_config = self.actualConfigHandler.read_actual_component(self.component) if not active_config is None: livestatus['configurationTags'] = active_config http://git-wip-us.apache.org/repos/asf/ambari/blob/dfedb031/ambari-agent/src/main/python/ambari_agent/NetUtil.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/NetUtil.py b/ambari-agent/src/main/python/ambari_agent/NetUtil.py index ece7b8f..42d3875 100644 --- a/ambari-agent/src/main/python/ambari_agent/NetUtil.py +++ b/ambari-agent/src/main/python/ambari_agent/NetUtil.py @@ -39,18 +39,22 @@ class NetUtil: """Try to connect to a given url. Result is True if url returns HTTP code 200, in any other case (like unreachable server or wrong HTTP code) result will be False """ - logger.info("Connecting to the following url " + url); + logger.info("Connecting to " + url); + try: parsedurl = urlparse(url) ca_connection = httplib.HTTPSConnection(parsedurl[1]) - ca_connection.request("GET", parsedurl[2]) + ca_connection.request("HEAD", parsedurl[2]) response = ca_connection.getresponse() status = response.status - logger.info("Calling url received " + str(status)) - if status == 200: + requestLogMessage = "HEAD %s -> %s" + + if status == 200: + logger.debug(requestLogMessage, url, str(status) ) return True else: + logger.warning(requestLogMessage, url, str(status) ) return False except SSLError as slerror: logger.error(str(slerror)) @@ -69,7 +73,8 @@ class NetUtil: Returns count of retries """ if logger is not None: - logger.info("DEBUG: Trying to connect to the server at " + server_url) + logger.debug("Trying to connect to %s", server_url) + retries = 0 while (max_retries == -1 or retries < max_retries) and not self.DEBUG_STOP_RETRIES_FLAG: server_is_up = self.checkURL(self.SERVER_STATUS_REQUEST.format(server_url)) @@ -77,7 +82,7 @@ class NetUtil: break else: if logger is not None: - logger.info('Server at {0} is not reachable, sleeping for {1} seconds...'.format(server_url, + logger.warn('Server at {0} is not reachable, sleeping for {1} seconds...'.format(server_url, self.CONNECT_SERVER_RETRY_INTERVAL_SEC)) retries += 1 time.sleep(self.CONNECT_SERVER_RETRY_INTERVAL_SEC) http://git-wip-us.apache.org/repos/asf/ambari/blob/dfedb031/ambari-agent/src/main/python/ambari_agent/main.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py index 78dde9e..7ea70ea 100644 --- a/ambari-agent/src/main/python/ambari_agent/main.py +++ b/ambari-agent/src/main/python/ambari_agent/main.py @@ -18,7 +18,6 @@ See the License for the specific language governing permissions and limitations under the License. ''' -import logging import logging.handlers import signal from optparse import OptionParser @@ -30,12 +29,11 @@ import ConfigParser import ProcessHelper from Controller import Controller import AmbariConfig -from security import CertificateManager from NetUtil import NetUtil from PingPortListener import PingPortListener -import security import hostname from DataCleaner import DataCleaner +import socket logger = logging.getLogger() formatstr = "%(levelname)s %(asctime)s %(filename)s:%(lineno)d - %(message)s" @@ -222,9 +220,14 @@ def main(): update_log_level(config) - server_url = 'https://' + config.get('server', 'hostname') + ':' + config.get('server', 'url_port') - print("Connecting to the server at " + server_url + "...") - logger.info('Connecting to the server at: ' + server_url) + server_hostname = config.get('server', 'hostname') + server_url = 'https://' + server_hostname + ':' + config.get('server', 'url_port') + + try: + server_ip = socket.gethostbyname(server_hostname) + logger.info('Connecting to Ambari server at %s (%s)', server_url, server_ip) + except socket.error: + logger.warn("Unable to determine the IP address of the Ambari server '%s'", server_hostname) # Wait until server is reachable netutil = NetUtil() http://git-wip-us.apache.org/repos/asf/ambari/blob/dfedb031/ambari-agent/src/test/python/ambari_agent/TestController.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestController.py b/ambari-agent/src/test/python/ambari_agent/TestController.py index 30e03c4..dd92e06 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestController.py +++ b/ambari-agent/src/test/python/ambari_agent/TestController.py @@ -307,7 +307,7 @@ class TestController(unittest.TestCase): conMock = MagicMock() security_mock.CachedHTTPSConnection.return_value = conMock - url = "url" + url = "http://ambari.apache.org:8081/agent" data = "data" requestMock.return_value = "request" @@ -329,9 +329,10 @@ class TestController(unittest.TestCase): + '; Response: {invalid_object}')} self.assertEqual(actual, expected) - conMock.request.side_effect = Exception() + exceptionMessage = "Connection Refused" + conMock.request.side_effect = Exception(exceptionMessage) actual = self.controller.sendRequest(url, data) - expected = {'exitstatus': 1, 'log': 'Request failed! Data: ' + data} + expected = {'exitstatus': 1, 'log': 'Request to ' + url + ' failed due to ' + exceptionMessage} self.assertEqual(actual, expected) http://git-wip-us.apache.org/repos/asf/ambari/blob/dfedb031/ambari-agent/src/test/python/ambari_agent/TestMain.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestMain.py b/ambari-agent/src/test/python/ambari_agent/TestMain.py index afe9b59..7cf14b6 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestMain.py +++ b/ambari-agent/src/test/python/ambari_agent/TestMain.py @@ -19,14 +19,14 @@ limitations under the License. ''' import StringIO import sys -from mock.mock import MagicMock, patch, ANY import unittest import logging import signal -import ConfigParser import os +import socket import tempfile -from optparse import OptionParser + +from mock.mock import MagicMock, patch, ANY with patch("platform.linux_distribution", return_value = ('Suse','11','Final')): from ambari_agent import NetUtil, security @@ -226,6 +226,7 @@ class TestMain(unittest.TestCase): os.remove(tmpoutfile) + @patch.object(socket, "gethostbyname") @patch.object(main, "setup_logging") @patch.object(main, "bind_signal_handlers") @patch.object(main, "stop_agent") @@ -241,11 +242,12 @@ class TestMain(unittest.TestCase): @patch.object(DataCleaner,"start") @patch.object(DataCleaner,"__init__") @patch.object(PingPortListener,"start") - @patch.object(PingPortListener,"__init__") + @patch.object(PingPortListener,"__init__") def test_main(self, ping_port_init_mock, ping_port_start_mock, data_clean_init_mock,data_clean_start_mock, parse_args_mock, join_mock, start_mock, Controller_init_mock, try_to_connect_mock, update_log_level_mock, daemonize_mock, perform_prestart_checks_mock, - resolve_ambari_config_mock, stop_mock, bind_signal_handlers_mock, setup_logging_mock): + resolve_ambari_config_mock, stop_mock, bind_signal_handlers_mock, + setup_logging_mock, socket_mock): data_clean_init_mock.return_value = None Controller_init_mock.return_value = None ping_port_init_mock.return_value = None