This is an automated email from the ASF dual-hosted git repository. aonishuk pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/ambari.git
commit 451cdacb6bd6549de297ef4c995111a6e01832f6 Author: Andrew Onishuk <aonis...@hortonworks.com> AuthorDate: Thu Jun 14 12:33:52 2018 +0300 AMBARI-24097. Canceling task during blueprint install results in agent not responding to any other tasks (aonishuk) --- ambari-agent/src/main/python/ambari_agent/ActionQueue.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index f0c996b..6ee3ec0 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -83,6 +83,8 @@ class ActionQueue(threading.Thread): self.tmpdir = self.config.get('agent', 'prefix') self.customServiceOrchestrator = initializer_module.customServiceOrchestrator self.parallel_execution = self.config.get_parallel_exec_option() + self.taskIdsToCancel = set() + self.cancelEvent = threading.Event() self.component_status_executor = initializer_module.component_status_executor if self.parallel_execution == 1: logger.info("Parallel execution is enabled, will execute agent commands in parallel") @@ -133,6 +135,8 @@ class ActionQueue(threading.Thread): # Kill if in progress self.customServiceOrchestrator.cancel_command(task_id, reason) + self.taskIdsToCancel.add(task_id) + self.cancelEvent.set() def run(self): while not self.stop_event.is_set(): @@ -275,6 +279,13 @@ class ActionQueue(threading.Thread): format(taskId=taskId, retryAble=retryAble, retryDuration=retryDuration, log_command_output=log_command_output)) command_canceled = False while retryDuration >= 0: + if taskId in self.taskIdsToCancel: + logger.info('Command with taskId = {0} canceled'.format(taskId)) + command_canceled = True + + self.taskIdsToCancel.discard(taskId) + break + numAttempts += 1 start = 0 if retryAble: @@ -303,6 +314,7 @@ class ActionQueue(threading.Thread): if (commandresult['exitcode'] == -signal.SIGTERM) or (commandresult['exitcode'] == -signal.SIGKILL): logger.info('Command with taskId = {cid} was canceled!'.format(cid=taskId)) command_canceled = True + self.taskIdsToCancel.discard(taskId) break if status != self.COMPLETED_STATUS and retryAble and retryDuration > 0: @@ -316,7 +328,7 @@ class ActionQueue(threading.Thread): command['agentLevelParams'] = {} command['agentLevelParams']['commandBeingRetried'] = "true" - time.sleep(delay) + self.cancelEvent.wait(delay) # wake up if something was canceled continue else: logger.info("Quit retrying for command with taskId = {cid}. Status: {status}, retryAble: {retryAble}, retryDuration (sec): {retryDuration}, last delay (sec): {delay}" -- To stop receiving notification emails like this one, please contact aonis...@apache.org.