AMBARI-19342. Race condition in agent on command reschedule. (mpapirkovskyy)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/315691fc Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/315691fc Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/315691fc Branch: refs/heads/branch-dev-patch-upgrade Commit: 315691fc790a94e4830fac0ccbb92046bd9a3719 Parents: c458134 Author: Myroslav Papirkovskyi <[email protected]> Authored: Tue Jan 3 18:41:42 2017 +0200 Committer: Myroslav Papirkovskyi <[email protected]> Committed: Tue Jan 3 20:18:35 2017 +0200 ---------------------------------------------------------------------- .../src/main/python/ambari_agent/ActionQueue.py | 14 ++++++++------ .../src/main/python/ambari_agent/Controller.py | 17 +++++++++++------ 2 files changed, 19 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/315691fc/ambari-agent/src/main/python/ambari_agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index 793eeba..d70b344 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -89,6 +89,7 @@ class ActionQueue(threading.Thread): self.parallel_execution = config.get_parallel_exec_option() if self.parallel_execution == 1: logger.info("Parallel execution is enabled, will execute agent commands in parallel") + self.lock = threading.Lock() def stop(self): self._stop.set() @@ -352,12 +353,13 @@ class ActionQueue(threading.Thread): # do not fail task which was rescheduled from server if command_canceled: - with self.commandQueue.mutex: - for com in self.commandQueue.queue: - if com['taskId'] == command['taskId']: - logger.info('Command with taskId = {cid} was rescheduled by server. ' - 'Fail report on cancelled command won\'t be sent with heartbeat.'.format(cid=taskId)) - return + with self.lock: + with self.commandQueue.mutex: + for com in self.commandQueue.queue: + if com['taskId'] == command['taskId']: + logger.info('Command with taskId = {cid} was rescheduled by server. ' + 'Fail report on cancelled command won\'t be sent with heartbeat.'.format(cid=taskId)) + return # final result to stdout commandresult['stdout'] += '\n\nCommand completed successfully!\n' if status == self.COMPLETED_STATUS else '\n\nCommand failed after ' + str(numAttempts) + ' tries\n' http://git-wip-us.apache.org/repos/asf/ambari/blob/315691fc/ambari-agent/src/main/python/ambari_agent/Controller.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py index a762d3f..56b1992 100644 --- a/ambari-agent/src/main/python/ambari_agent/Controller.py +++ b/ambari-agent/src/main/python/ambari_agent/Controller.py @@ -360,13 +360,18 @@ class Controller(threading.Thread): self.cluster_configuration.update_configurations_from_heartbeat(response) response_keys = response.keys() - if 'cancelCommands' in response_keys: - self.cancelCommandInQueue(response['cancelCommands']) - if 'executionCommands' in response_keys: - execution_commands = response['executionCommands'] - self.recovery_manager.process_execution_commands(execution_commands) - self.addToQueue(execution_commands) + # there's case when canceled task can be processed in Action Queue.execute before adding rescheduled task to queue + # this can cause command failure instead result suppression + # so canceling and putting rescheduled commands should be executed atomically + with self.actionQueue.lock: + if 'cancelCommands' in response_keys: + self.cancelCommandInQueue(response['cancelCommands']) + + if 'executionCommands' in response_keys: + execution_commands = response['executionCommands'] + self.recovery_manager.process_execution_commands(execution_commands) + self.addToQueue(execution_commands) if 'statusCommands' in response_keys: # try storing execution command details and desired state
