Repository: ambari
Updated Branches:
  refs/heads/trunk 9c25b6ce6 -> dfedb0313


AMBARI-6536. Add more info about the Heartbeat message from Ambari Agent to 
Server in its logs (Jonathan Hurley via ncole)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/dfedb031
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/dfedb031
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/dfedb031

Branch: refs/heads/trunk
Commit: dfedb0313552530827eba7899cb5250ae5c9d7d1
Parents: 9c25b6c
Author: Nate Cole <nc...@hortonworks.com>
Authored: Mon Jul 7 09:51:12 2014 -0400
Committer: Nate Cole <nc...@hortonworks.com>
Committed: Mon Jul 7 09:51:12 2014 -0400

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/Controller.py  | 117 +++++++++++++------
 .../src/main/python/ambari_agent/Heartbeat.py   |  18 +--
 .../src/main/python/ambari_agent/LiveStatus.py  |   4 +-
 .../src/main/python/ambari_agent/NetUtil.py     |  17 ++-
 .../src/main/python/ambari_agent/main.py        |  15 ++-
 .../test/python/ambari_agent/TestController.py  |   7 +-
 .../src/test/python/ambari_agent/TestMain.py    |  12 +-
 7 files changed, 124 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/dfedb031/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py 
b/ambari-agent/src/main/python/ambari_agent/Controller.py
index 9839313..bf68616 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -23,6 +23,7 @@ import signal
 import json
 import sys
 import os
+import socket
 import time
 import threading
 import urllib2
@@ -54,7 +55,8 @@ class Controller(threading.Thread):
     self.credential = None
     self.config = config
     self.hostname = hostname.hostname()
-    server_secured_url = 'https://' + config.get('server', 'hostname') + \
+    self.serverHostname = config.get('server', 'hostname')
+    server_secured_url = 'https://' + self.serverHostname + \
                          ':' + config.get('server', 'secured_url_port')
     self.registerUrl = server_secured_url + '/agent/v1/register/' + 
self.hostname
     self.heartbeatUrl = server_secured_url + '/agent/v1/heartbeat/' + 
self.hostname
@@ -85,28 +87,39 @@ class Controller(threading.Thread):
     ret = {}
 
     while not self.isRegistered:
-      try:
+      try:                
         data = json.dumps(self.register.build(id))
-        logger.info("Registering with the server " + pprint.pformat(data))
+        prettyData = pprint.pformat(data)
+        
+        try:
+          server_ip = socket.gethostbyname(self.hostname)
+          logger.info("Registering with %s (%s) (agent=%s)", self.hostname, 
server_ip, prettyData)
+        except socket.error:          
+          logger.warn("Unable to determine the IP address of '%s', agent 
registration may fail (agent=%s)", 
+                      self.hostname, prettyData)
+                
         ret = self.sendRequest(self.registerUrl, data)
-        exitstatus = 0
+        
         # exitstatus is a code of error which was rised on server side.
         # exitstatus = 0 (OK - Default)
-        # exitstatus = 1 (Registration failed because
-        #                different version of agent and server)
+        # exitstatus = 1 (Registration failed because different version of 
agent and server)
+        exitstatus = 0
         if 'exitstatus' in ret.keys():
           exitstatus = int(ret['exitstatus'])
-        # log - message, which will be printed to agents  log  
-        if 'log' in ret.keys():
-          log = ret['log']
+                
         if exitstatus == 1:
+          # log - message, which will be printed to agents log  
+          if 'log' in ret.keys():
+            log = ret['log']          
+          
           logger.error(log)
           self.isRegistered = False
           self.repeatRegistration=False
           return ret
-        logger.info("Registered with the server with " + pprint.pformat(ret))
-        print("Registered with the server")
-        self.responseId= int(ret['responseId'])
+        
+        logger.info("Registration Successful (response=%s)", 
pprint.pformat(ret))
+
+        self.responseId = int(ret['responseId'])
         self.isRegistered = True
         if 'statusCommands' in ret.keys():
           logger.info("Got status commands on registration " + 
pprint.pformat(ret['statusCommands']) )
@@ -119,10 +132,10 @@ class Controller(threading.Thread):
         self.repeatRegistration=False
         self.isRegistered = False
         return
-      except Exception, err:
+      except Exception:
         # try a reconnect only after a certain amount of random time
         delay = randint(0, self.range)
-        logger.info("Unable to connect to: " + self.registerUrl, exc_info = 
True)
+        logger.error("Unable to connect to: " + self.registerUrl, exc_info = 
True)
         """ Sleeping for {0} seconds and then retrying again """.format(delay)
         time.sleep(delay)
         pass
@@ -135,7 +148,7 @@ class Controller(threading.Thread):
     """ Put the required actions into the Queue """
     """ Verify if the action is to reboot or not """
     if not commands:
-      logger.debug("No commands from the server : " + pprint.pformat(commands))
+      logger.debug("No commands received from %s", self.serverHostname)
     else:
       """Only add to the queue if not empty list """
       self.actionQueue.put(commands)
@@ -143,7 +156,7 @@ class Controller(threading.Thread):
 
   def addToStatusQueue(self, commands):
     if not commands:
-      logger.debug("No status commands from the server : " + 
pprint.pformat(commands))
+      logger.debug("No status commands received from %s", self.serverHostname)
     else:
       if not LiveStatus.SERVICES:
         self.updateComponents(commands[0]['clusterName'])
@@ -171,15 +184,28 @@ class Controller(threading.Thread):
         if not retry:
           data = json.dumps(
               self.heartbeat.build(self.responseId, int(hb_interval), 
self.hasMappedComponents))
-          logger.debug("Sending request: " + data)
           pass
         else:
           self.DEBUG_HEARTBEAT_RETRIES += 1
-        response = self.sendRequest(self.heartbeatUrl, data)
 
-        logger.debug('Got server response: ' + pprint.pformat(response))
+        if logger.isEnabledFor(logging.DEBUG):
+          logger.debug("Sending Heartbeat (id = %s): %s", self.responseId, 
data)
+        
+        response = self.sendRequest(self.heartbeatUrl, data)
         
-        serverId=int(response['responseId'])
+        exitStatus = 0
+        if 'exitstatus' in response.keys():
+          exitStatus = int(response['exitstatus'])   
+        
+        if exitStatus != 0:
+          raise Exception(response)
+        
+        serverId = int(response['responseId'])
+
+        if logger.isEnabledFor(logging.DEBUG):
+          logger.debug('Heartbeat response (id = %s): %s', serverId, 
pprint.pformat(response))
+        else:
+          logger.info('Heartbeat response received (id = %s)', serverId)       
         
 
         if 'hasMappedComponents' in response.keys():
           self.hasMappedComponents = response['hasMappedComponents'] != False
@@ -192,7 +218,7 @@ class Controller(threading.Thread):
             self.repeatRegistration = True
             return
 
-        if serverId!=self.responseId+1:
+        if serverId != self.responseId + 1:
           logger.error("Error in responseId sequence - restarting")
           self.restartAgent()
         else:
@@ -201,19 +227,21 @@ class Controller(threading.Thread):
         if 'executionCommands' in response.keys():
           self.addToQueue(response['executionCommands'])
           pass
+        
         if 'statusCommands' in response.keys():
           self.addToStatusQueue(response['statusCommands'])
           pass
+        
         if "true" == response['restartAgent']:
-          logger.error("Got restartAgent command")
+          logger.error("Received the restartAgent command")
           self.restartAgent()
         else:
-          logger.info("No commands sent from the Server.")
+          logger.info("No commands sent from %s", self.serverHostname)
           pass
 
         if retry:
-          print("Reconnected to the server")
-          logger.info("Reconnected to the server")
+          logger.info("Reconnected to %s", self.heartbeatUrl)
+          
         retry=False
         certVerifFailed = False
         self.DEBUG_SUCCESSFULL_HEARTBEATS += 1
@@ -227,18 +255,29 @@ class Controller(threading.Thread):
         #randomize the heartbeat
         delay = randint(0, self.range)
         time.sleep(delay)
+        
         if "code" in err:
           logger.error(err.code)
         else:
-          logger.error("Unable to connect to: " + self.heartbeatUrl + " due to 
" + str(err))
-          logger.debug("Details: " + str(err), exc_info=True)
+          logException = False
+          if logger.isEnabledFor(logging.DEBUG):
+            logException = True
+          
+          exceptionMessage = str(err)
+          errorMessage = "Unable to reconnect to {0} (attempts={1}, 
details={2})".format(self.heartbeatUrl, self.DEBUG_HEARTBEAT_RETRIES, 
exceptionMessage)
+          
           if not retry:
-            print("Connection to the server was lost. Reconnecting...")
+            errorMessage = "Connection to {0} was lost 
(details={1})".format(self.serverHostname, exceptionMessage)
+          
+          logger.error(errorMessage, exc_info=logException)
+            
           if 'certificate verify failed' in str(err) and not certVerifFailed:
-            print("Server certificate verify failed. Did you regenerate server 
certificate?")
+            logger.warn("Server certificate verify failed. Did you regenerate 
server certificate?")
             certVerifFailed = True
+            
         self.cachedconnect = None # Previous connection is broken now
         retry=True
+        
       # Sleep for some time
       timeout = self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC \
                 - self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS
@@ -268,11 +307,13 @@ class Controller(threading.Thread):
   def registerAndHeartbeat(self):
     registerResponse = self.registerWithServer()
     message = registerResponse['response']
-    logger.info("Response from server = " + message)
+    logger.info("Registration response from %s was %s", self.serverHostname, 
message)
+    
     if self.isRegistered:
       # Process callbacks
       for callback in self.registration_listeners:
         callback()
+        
       time.sleep(self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC)
       self.heartbeatWithServer()
 
@@ -281,17 +322,17 @@ class Controller(threading.Thread):
     pass
 
   def sendRequest(self, url, data):
+    response = None
+    
     try:
       if self.cachedconnect is None: # Lazy initialization
         self.cachedconnect = security.CachedHTTPSConnection(self.config)
-      req = urllib2.Request(url, data, {'Content-Type': 'application/json'})
-      response = None
+      req = urllib2.Request(url, data, {'Content-Type': 'application/json'})   
   
       response = self.cachedconnect.request(req)
       return json.loads(response)
-    except Exception:
+    except Exception, exception:
       if response is None:
-        err_msg = 'Request failed! Data: ' + str(data)
-        logger.warn(err_msg)
+        err_msg = 'Request to {0} failed due to {1}'.format(url, 
str(exception))
         return {'exitstatus': 1, 'log': err_msg}
       else:
         err_msg = ('Response parsing failed! Request data: ' + str(data)
@@ -301,8 +342,10 @@ class Controller(threading.Thread):
 
   def updateComponents(self, cluster_name):
     logger.info("Updating components map of cluster " + cluster_name)
-    response = self.sendRequest(self.componentsUrl + cluster_name, None)
-    logger.debug("Response from server = " + str(response))
+    
+    response = self.sendRequest(self.componentsUrl + cluster_name, None)    
+    logger.debug("Response from %s was %s", self.serverHostname, str(response))
+    
     for service, components in response['components'].items():
       LiveStatus.SERVICES.append(service)
       for component, category in components.items():

http://git-wip-us.apache.org/repos/asf/ambari/blob/dfedb031/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Heartbeat.py 
b/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
index 6c7543e..65a7a33 100644
--- a/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
+++ b/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
@@ -58,6 +58,7 @@ class Heartbeat:
     commandsInProgress = False
     if not self.actionQueue.commandQueue.empty():
       commandsInProgress = True
+      
     if len(queueResult) != 0:
       heartbeat['reports'] = queueResult['reports']
       heartbeat['componentStatus'] = queueResult['componentStatus']
@@ -70,23 +71,26 @@ class Heartbeat:
     if int(id) == 0:
       componentsMapped = False
 
-    logger.info("Sending heartbeat with response id: " + str(id) + " and "
-                "timestamp: " + str(timestamp) +
-                ". Command(s) in progress: " + repr(commandsInProgress) +
-                ". Components mapped: " + repr(componentsMapped))
-    logger.debug("Heartbeat : " + pformat(heartbeat))
+    logger.info("Building Heartbeat: {responseId = %s, timestamp = %s, 
commandsInProgress = %s, componentsMapped = %s}", 
+        str(id), str(timestamp), repr(commandsInProgress), 
repr(componentsMapped))
+    
+    if logger.isEnabledFor(logging.DEBUG):
+      logger.debug("Heartbeat: %s", pformat(heartbeat))
 
     if (int(id) >= 0) and state_interval > 0 and (int(id) % state_interval) == 
0:
       hostInfo = HostInfo(self.config)
       nodeInfo = { }
+      
       # for now, just do the same work as registration
       # this must be the last step before returning heartbeat
       hostInfo.register(nodeInfo, componentsMapped, commandsInProgress)
       heartbeat['agentEnv'] = nodeInfo
-      logger.debug("agentEnv : " + str(nodeInfo))
       mounts = Hardware.osdisks()
       heartbeat['mounts'] = mounts
-      logger.debug("mounts : " + str(mounts))
+            
+      if logger.isEnabledFor(logging.DEBUG):
+        logger.debug("agentEnv: %s", str(nodeInfo))
+        logger.debug("mounts: %s", str(mounts))
 
     return heartbeat
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/dfedb031/ambari-agent/src/main/python/ambari_agent/LiveStatus.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/LiveStatus.py 
b/ambari-agent/src/main/python/ambari_agent/LiveStatus.py
index 49cea62..49189c8 100644
--- a/ambari-agent/src/main/python/ambari_agent/LiveStatus.py
+++ b/ambari-agent/src/main/python/ambari_agent/LiveStatus.py
@@ -58,7 +58,6 @@ class LiveStatus:
     """
     global SERVICES, CLIENT_COMPONENTS, COMPONENTS, LIVE_STATUS, DEAD_STATUS
 
-    livestatus = None
     component = {"serviceName" : self.service, "componentName" : 
self.component}
     if forsed_component_status: # If already determined
       status = forsed_component_status  # Nothing to do
@@ -73,7 +72,7 @@ class LiveStatus:
         logger.warn("There is no service to pid mapping for " + self.component)
       status = self.LIVE_STATUS if serviceStatus else self.DEAD_STATUS
 
-    livestatus ={"componentName" : self.component,
+    livestatus = {"componentName" : self.component,
                  "msg" : "",
                  "status" : status,
                  "clusterName" : self.cluster,
@@ -81,6 +80,7 @@ class LiveStatus:
                  "stackVersion": self.versionsHandler.
                  read_stack_version(self.component)
     }
+    
     active_config = 
self.actualConfigHandler.read_actual_component(self.component)
     if not active_config is None:
       livestatus['configurationTags'] = active_config

http://git-wip-us.apache.org/repos/asf/ambari/blob/dfedb031/ambari-agent/src/main/python/ambari_agent/NetUtil.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/NetUtil.py 
b/ambari-agent/src/main/python/ambari_agent/NetUtil.py
index ece7b8f..42d3875 100644
--- a/ambari-agent/src/main/python/ambari_agent/NetUtil.py
+++ b/ambari-agent/src/main/python/ambari_agent/NetUtil.py
@@ -39,18 +39,22 @@ class NetUtil:
     """Try to connect to a given url. Result is True if url returns HTTP code 
200, in any other case
     (like unreachable server or wrong HTTP code) result will be False
     """
-    logger.info("Connecting to the following url " + url);
+    logger.info("Connecting to " + url);
+    
     try:
       parsedurl = urlparse(url)
       ca_connection = httplib.HTTPSConnection(parsedurl[1])
-      ca_connection.request("GET", parsedurl[2])
+      ca_connection.request("HEAD", parsedurl[2])
       response = ca_connection.getresponse()  
       status = response.status    
-      logger.info("Calling url received " + str(status))
       
-      if status == 200: 
+      requestLogMessage = "HEAD %s -> %s"
+      
+      if status == 200:
+        logger.debug(requestLogMessage, url, str(status) ) 
         return True
       else: 
+        logger.warning(requestLogMessage, url, str(status) )
         return False
     except SSLError as slerror:
       logger.error(str(slerror))
@@ -69,7 +73,8 @@ class NetUtil:
     Returns count of retries
     """
     if logger is not None:
-      logger.info("DEBUG: Trying to connect to the server at " + server_url)
+      logger.debug("Trying to connect to %s", server_url)
+      
     retries = 0
     while (max_retries == -1 or retries < max_retries) and not 
self.DEBUG_STOP_RETRIES_FLAG:
       server_is_up = 
self.checkURL(self.SERVER_STATUS_REQUEST.format(server_url))
@@ -77,7 +82,7 @@ class NetUtil:
         break
       else:
         if logger is not None:
-          logger.info('Server at {0} is not reachable, sleeping for {1} 
seconds...'.format(server_url,
+          logger.warn('Server at {0} is not reachable, sleeping for {1} 
seconds...'.format(server_url,
             self.CONNECT_SERVER_RETRY_INTERVAL_SEC))
         retries += 1
         time.sleep(self.CONNECT_SERVER_RETRY_INTERVAL_SEC)

http://git-wip-us.apache.org/repos/asf/ambari/blob/dfedb031/ambari-agent/src/main/python/ambari_agent/main.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/main.py 
b/ambari-agent/src/main/python/ambari_agent/main.py
index 78dde9e..7ea70ea 100644
--- a/ambari-agent/src/main/python/ambari_agent/main.py
+++ b/ambari-agent/src/main/python/ambari_agent/main.py
@@ -18,7 +18,6 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 '''
 
-import logging
 import logging.handlers
 import signal
 from optparse import OptionParser
@@ -30,12 +29,11 @@ import ConfigParser
 import ProcessHelper
 from Controller import Controller
 import AmbariConfig
-from security import CertificateManager
 from NetUtil import NetUtil
 from PingPortListener import PingPortListener
-import security
 import hostname
 from DataCleaner import DataCleaner
+import socket
 
 logger = logging.getLogger()
 formatstr = "%(levelname)s %(asctime)s %(filename)s:%(lineno)d - %(message)s"
@@ -222,9 +220,14 @@ def main():
 
   update_log_level(config)
 
-  server_url = 'https://' + config.get('server', 'hostname') + ':' + 
config.get('server', 'url_port')
-  print("Connecting to the server at " + server_url + "...")
-  logger.info('Connecting to the server at: ' + server_url)
+  server_hostname = config.get('server', 'hostname')
+  server_url = 'https://' + server_hostname + ':' + config.get('server', 
'url_port') 
+  
+  try:
+    server_ip = socket.gethostbyname(server_hostname)
+    logger.info('Connecting to Ambari server at %s (%s)', server_url, 
server_ip)
+  except socket.error:
+    logger.warn("Unable to determine the IP address of the Ambari server 
'%s'", server_hostname)  
 
   # Wait until server is reachable
   netutil = NetUtil()

http://git-wip-us.apache.org/repos/asf/ambari/blob/dfedb031/ambari-agent/src/test/python/ambari_agent/TestController.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestController.py 
b/ambari-agent/src/test/python/ambari_agent/TestController.py
index 30e03c4..dd92e06 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestController.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestController.py
@@ -307,7 +307,7 @@ class TestController(unittest.TestCase):
 
     conMock = MagicMock()
     security_mock.CachedHTTPSConnection.return_value = conMock
-    url = "url"
+    url = "http://ambari.apache.org:8081/agent";
     data = "data"
     requestMock.return_value = "request"
 
@@ -329,9 +329,10 @@ class TestController(unittest.TestCase):
                                          + '; Response: {invalid_object}')}
     self.assertEqual(actual, expected)
 
-    conMock.request.side_effect = Exception()
+    exceptionMessage = "Connection Refused"
+    conMock.request.side_effect = Exception(exceptionMessage)
     actual = self.controller.sendRequest(url, data)
-    expected = {'exitstatus': 1, 'log': 'Request failed! Data: ' + data}
+    expected = {'exitstatus': 1, 'log': 'Request to ' + url + ' failed due to 
' + exceptionMessage}
 
     self.assertEqual(actual, expected)
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/dfedb031/ambari-agent/src/test/python/ambari_agent/TestMain.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestMain.py 
b/ambari-agent/src/test/python/ambari_agent/TestMain.py
index afe9b59..7cf14b6 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestMain.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestMain.py
@@ -19,14 +19,14 @@ limitations under the License.
 '''
 import StringIO
 import sys
-from mock.mock import MagicMock, patch, ANY
 import unittest
 import logging
 import signal
-import ConfigParser
 import os
+import socket
 import tempfile
-from optparse import OptionParser
+
+from mock.mock import MagicMock, patch, ANY
 
 with patch("platform.linux_distribution", return_value = 
('Suse','11','Final')):
   from ambari_agent import NetUtil, security
@@ -226,6 +226,7 @@ class TestMain(unittest.TestCase):
     os.remove(tmpoutfile)
 
 
+  @patch.object(socket, "gethostbyname")
   @patch.object(main, "setup_logging")
   @patch.object(main, "bind_signal_handlers")
   @patch.object(main, "stop_agent")
@@ -241,11 +242,12 @@ class TestMain(unittest.TestCase):
   @patch.object(DataCleaner,"start")
   @patch.object(DataCleaner,"__init__")
   @patch.object(PingPortListener,"start")
-  @patch.object(PingPortListener,"__init__")
+  @patch.object(PingPortListener,"__init__")  
   def test_main(self, ping_port_init_mock, ping_port_start_mock, 
data_clean_init_mock,data_clean_start_mock,
                 parse_args_mock, join_mock, start_mock, Controller_init_mock, 
try_to_connect_mock,
                 update_log_level_mock, daemonize_mock, 
perform_prestart_checks_mock,
-                resolve_ambari_config_mock, stop_mock, 
bind_signal_handlers_mock, setup_logging_mock):
+                resolve_ambari_config_mock, stop_mock, 
bind_signal_handlers_mock, 
+                setup_logging_mock, socket_mock):
     data_clean_init_mock.return_value = None
     Controller_init_mock.return_value = None
     ping_port_init_mock.return_value = None

Reply via email to