AMBARI-19342. Race condition in agent on command reschedule. (mpapirkovskyy)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/937c56b5
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/937c56b5
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/937c56b5

Branch: refs/heads/branch-2.5
Commit: 937c56b5ebb29a12b7dd7186cc0801293472f763
Parents: 1689df7
Author: Myroslav Papirkovskyi <[email protected]>
Authored: Tue Jan 3 18:41:42 2017 +0200
Committer: Myroslav Papirkovskyi <[email protected]>
Committed: Tue Jan 3 20:23:20 2017 +0200

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/ActionQueue.py    | 14 ++++++++------
 .../src/main/python/ambari_agent/Controller.py     | 17 +++++++++++------
 2 files changed, 19 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/937c56b5/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py 
b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index ea016e5..a9567c4 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -87,6 +87,7 @@ class ActionQueue(threading.Thread):
     self.parallel_execution = config.get_parallel_exec_option()
     if self.parallel_execution == 1:
       logger.info("Parallel execution is enabled, will execute agent commands 
in parallel")
+    self.lock = threading.Lock()
 
   def stop(self):
     self._stop.set()
@@ -347,12 +348,13 @@ class ActionQueue(threading.Thread):
 
     # do not fail task which was rescheduled from server
     if command_canceled:
-      with self.commandQueue.mutex:
-        for com in self.commandQueue.queue:
-          if com['taskId'] == command['taskId']:
-            logger.info('Command with taskId = {cid} was rescheduled by 
server. '
-                        'Fail report on cancelled command won\'t be sent with 
heartbeat.'.format(cid=taskId))
-            return
+      with self.lock:
+        with self.commandQueue.mutex:
+          for com in self.commandQueue.queue:
+            if com['taskId'] == command['taskId']:
+              logger.info('Command with taskId = {cid} was rescheduled by 
server. '
+                          'Fail report on cancelled command won\'t be sent 
with heartbeat.'.format(cid=taskId))
+              return
 
     # final result to stdout
     commandresult['stdout'] += '\n\nCommand completed successfully!\n' if 
status == self.COMPLETED_STATUS else '\n\nCommand failed after ' + 
str(numAttempts) + ' tries\n'

http://git-wip-us.apache.org/repos/asf/ambari/blob/937c56b5/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py 
b/ambari-agent/src/main/python/ambari_agent/Controller.py
index 672885e..beeaad9 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -358,13 +358,18 @@ class Controller(threading.Thread):
         
self.cluster_configuration.update_configurations_from_heartbeat(response)
 
         response_keys = response.keys()
-        if 'cancelCommands' in response_keys:
-          self.cancelCommandInQueue(response['cancelCommands'])
 
-        if 'executionCommands' in response_keys:
-          execution_commands = response['executionCommands']
-          self.recovery_manager.process_execution_commands(execution_commands)
-          self.addToQueue(execution_commands)
+        # there's case when canceled task can be processed in Action 
Queue.execute before adding rescheduled task to queue
+        # this can cause command failure instead result suppression
+        # so canceling and putting rescheduled commands should be executed 
atomically
+        with self.actionQueue.lock:
+          if 'cancelCommands' in response_keys:
+            self.cancelCommandInQueue(response['cancelCommands'])
+
+          if 'executionCommands' in response_keys:
+            execution_commands = response['executionCommands']
+            
self.recovery_manager.process_execution_commands(execution_commands)
+            self.addToQueue(execution_commands)
 
         if 'statusCommands' in response_keys:
           # try storing execution command details and desired state

Reply via email to