Repository: ambari
Updated Branches:
  refs/heads/branch-2.2 dbde2f9b0 -> f4e6e59dc


AMBARI-16036. Add logging for problems in ambari-agent Controller and 
ActionQueue (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/f4e6e59d
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/f4e6e59d
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/f4e6e59d

Branch: refs/heads/branch-2.2
Commit: f4e6e59dc8231aaea46d450efe2ea9a180e27367
Parents: dbde2f9
Author: Andrew Onishuk <[email protected]>
Authored: Fri Apr 22 12:57:43 2016 +0300
Committer: Andrew Onishuk <[email protected]>
Committed: Fri Apr 22 12:57:43 2016 +0300

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/ActionQueue.py | 68 +++++++++++---------
 .../src/main/python/ambari_agent/Controller.py  | 36 +++++++----
 .../python/ambari_agent/HeartbeatHandlers.py    |  1 +
 .../test/python/ambari_agent/TestActionQueue.py | 22 +++----
 4 files changed, 71 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/f4e6e59d/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py 
b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index c4d0b5e..0734bd7 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -144,37 +144,43 @@ class ActionQueue(threading.Thread):
       self.customServiceOrchestrator.cancel_command(task_id, reason)
 
   def run(self):
-    while not self.stopped():
-      self.processBackgroundQueueSafeEmpty();
-      self.processStatusCommandQueueSafeEmpty();
-      try:
-        if self.parallel_execution == 0:
-          command = self.commandQueue.get(True, 
self.EXECUTION_COMMAND_WAIT_TIME)
-          self.process_command(command)
-        else:
-          # If parallel execution is enabled, just kick off all available
-          # commands using separate threads
-          while (True):
+    try:
+      while not self.stopped():
+        self.processBackgroundQueueSafeEmpty();
+        self.processStatusCommandQueueSafeEmpty();
+        try:
+          if self.parallel_execution == 0:
             command = self.commandQueue.get(True, 
self.EXECUTION_COMMAND_WAIT_TIME)
-            # If command is not retry_enabled then do not start them in 
parallel
-            # checking just one command is enough as all commands for a stage 
is sent
-            # at the same time and retry is only enabled for initial 
start/install
-            retryAble = False
-            if 'commandParams' in command and 'command_retry_enabled' in 
command['commandParams']:
-              retryAble = command['commandParams']['command_retry_enabled'] == 
"true"
-            if retryAble:
-              logger.info("Kicking off a thread for the command, id=" +
-                          str(command['commandId']) + " taskId=" + 
str(command['taskId']))
-              t = threading.Thread(target=self.process_command, 
args=(command,))
-              t.daemon = True
-              t.start()
-            else:
-              self.process_command(command)
-              break;
+            self.process_command(command)
+          else:
+            # If parallel execution is enabled, just kick off all available
+            # commands using separate threads
+            while (True):
+              command = self.commandQueue.get(True, 
self.EXECUTION_COMMAND_WAIT_TIME)
+              # If command is not retry_enabled then do not start them in 
parallel
+              # checking just one command is enough as all commands for a 
stage is sent
+              # at the same time and retry is only enabled for initial 
start/install
+              retryAble = False
+              if 'commandParams' in command and 'command_retry_enabled' in 
command['commandParams']:
+                retryAble = command['commandParams']['command_retry_enabled'] 
== "true"
+              if retryAble:
+                logger.info("Kicking off a thread for the command, id=" +
+                            str(command['commandId']) + " taskId=" + 
str(command['taskId']))
+                t = threading.Thread(target=self.process_command, 
args=(command,))
+                t.daemon = True
+                t.start()
+              else:
+                self.process_command(command)
+                break;
+              pass
             pass
+        except (Queue.Empty):
           pass
-      except (Queue.Empty):
-        pass
+    except:
+      logger.exception("ActionQueue thread failed with exception:")
+      raise
+    
+    logger.info("ActionQueue thread has successfully finished")
 
   def processBackgroundQueueSafeEmpty(self):
     while not self.backgroundCommandQueue.empty():
@@ -217,10 +223,8 @@ class ActionQueue(threading.Thread):
         self.execute_status_command(command)
       else:
         logger.error("Unrecognized command " + pprint.pformat(command))
-    except Exception, err:
-      # Should not happen
-      traceback.print_exc()
-      logger.warn(err)
+    except Exception:
+      logger.exception("Exception while processing {0} 
command".format(commandType))
 
   def tasks_in_progress_or_pending(self):
     return_val = False

http://git-wip-us.apache.org/repos/asf/ambari/blob/f4e6e59d/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py 
b/ambari-agent/src/main/python/ambari_agent/Controller.py
index 9f6d32b..e149789 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -332,6 +332,7 @@ class Controller(threading.Thread):
       except ssl.SSLError:
         self.repeatRegistration=False
         self.isRegistered = False
+        logger.exception("SSLError while trying to heartbeat.")
         return
       except Exception, err:
         if "code" in err:
@@ -369,19 +370,26 @@ class Controller(threading.Thread):
         self.DEBUG_STOP_HEARTBEATING=True
 
   def run(self):
-    self.actionQueue = ActionQueue(self.config, controller=self)
-    self.actionQueue.start()
-    self.register = Register(self.config)
-    self.heartbeat = Heartbeat(self.actionQueue, self.config, 
self.alert_scheduler_handler.collector())
-
-    opener = urllib2.build_opener()
-    urllib2.install_opener(opener)
-
-    while True:
-      self.repeatRegistration = False
-      self.registerAndHeartbeat()
-      if not self.repeatRegistration:
-        break
+    try:
+      self.actionQueue = ActionQueue(self.config, controller=self)
+      self.actionQueue.start()
+      self.register = Register(self.config)
+      self.heartbeat = Heartbeat(self.actionQueue, self.config, 
self.alert_scheduler_handler.collector())
+  
+      opener = urllib2.build_opener()
+      urllib2.install_opener(opener)
+  
+      while True:
+        self.repeatRegistration = False
+        self.registerAndHeartbeat()
+        if not self.repeatRegistration:
+          logger.info("Finished heartbeating and registering cycle")
+          break
+    except:
+      logger.exception("Controller thread failed with exception:")
+      raise
+    
+    logger.info("Controller thread has successfully finished")
 
   def registerAndHeartbeat(self):
     registerResponse = self.registerWithServer()
@@ -402,6 +410,8 @@ class Controller(threading.Thread):
 
         time.sleep(self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC)
         self.heartbeatWithServer()
+      else:
+        logger.info("Registration response from %s didn't contain 'response' 
as a key".format(self.serverHostname))
 
   def restartAgent(self):
     ExitHelper().exit(AGENT_AUTO_RESTART_EXIT_CODE)

http://git-wip-us.apache.org/repos/asf/ambari/blob/f4e6e59d/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py 
b/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py
index 4347595..81ccd22 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py
@@ -77,6 +77,7 @@ class HeartbeatStopHandlersWindows(HeartbeatStopHandlers):
 
 def signal_handler(signum, frame):
   global _handler
+  logger.info("Ambari-agent received {0} signal, stopping...".format(signum))
   _handler.set_stop()
 
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/f4e6e59d/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py 
b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index a81bc55..ffb6419 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -284,11 +284,11 @@ class TestActionQueue(TestCase):
 
 
   @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = 
os_distro_value))
-  @patch("traceback.print_exc")
+  @patch("logging.RootLogger.exception")
   @patch.object(ActionQueue, "execute_command")
   @patch.object(ActionQueue, "execute_status_command")
   def test_process_command(self, execute_status_command_mock,
-                           execute_command_mock, print_exc_mock):
+                           execute_command_mock, log_exc_mock):
     dummy_controller = MagicMock()
     config = AmbariConfig()
     config.set('agent', 'tolerate_download_failures', "true")
@@ -306,42 +306,42 @@ class TestActionQueue(TestCase):
     actionQueue.process_command(wrong_command)
     self.assertFalse(execute_command_mock.called)
     self.assertFalse(execute_status_command_mock.called)
-    self.assertFalse(print_exc_mock.called)
+    self.assertFalse(log_exc_mock.called)
 
     execute_command_mock.reset_mock()
     execute_status_command_mock.reset_mock()
-    print_exc_mock.reset_mock()
+    log_exc_mock.reset_mock()
     # Try normal execution
     actionQueue.process_command(execution_command)
     self.assertTrue(execute_command_mock.called)
     self.assertFalse(execute_status_command_mock.called)
-    self.assertFalse(print_exc_mock.called)
+    self.assertFalse(log_exc_mock.called)
 
     execute_command_mock.reset_mock()
     execute_status_command_mock.reset_mock()
-    print_exc_mock.reset_mock()
+    log_exc_mock.reset_mock()
 
     actionQueue.process_command(status_command)
     self.assertFalse(execute_command_mock.called)
     self.assertTrue(execute_status_command_mock.called)
-    self.assertFalse(print_exc_mock.called)
+    self.assertFalse(log_exc_mock.called)
 
     execute_command_mock.reset_mock()
     execute_status_command_mock.reset_mock()
-    print_exc_mock.reset_mock()
+    log_exc_mock.reset_mock()
 
     # Try exception to check proper logging
     def side_effect(self):
       raise Exception("TerribleException")
     execute_command_mock.side_effect = side_effect
     actionQueue.process_command(execution_command)
-    self.assertTrue(print_exc_mock.called)
+    self.assertTrue(log_exc_mock.called)
 
-    print_exc_mock.reset_mock()
+    log_exc_mock.reset_mock()
 
     execute_status_command_mock.side_effect = side_effect
     actionQueue.process_command(execution_command)
-    self.assertTrue(print_exc_mock.called)
+    self.assertTrue(log_exc_mock.called)
 
   @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = 
os_distro_value))
   @patch.object(CustomServiceOrchestrator, "runCommand")

Reply via email to