Repository: ambari Updated Branches: refs/heads/branch-2.5 033989e7f -> 83adcf667
AMBARI-19051. Stage is sometimes marked as failed on command reschedule. (mpapirkovskyy) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/83adcf66 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/83adcf66 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/83adcf66 Branch: refs/heads/branch-2.5 Commit: 83adcf66763cc4b398388723689ea164527bafba Parents: 033989e Author: Myroslav Papirkovskyi <mpapyrkovs...@hortonworks.com> Authored: Thu Dec 1 18:48:25 2016 +0200 Committer: Myroslav Papirkovskyi <mpapyrkovs...@hortonworks.com> Committed: Fri Dec 2 09:49:23 2016 +0200 ---------------------------------------------------------------------- .../src/main/python/ambari_agent/ActionQueue.py | 11 +++++ .../test/python/ambari_agent/TestActionQueue.py | 48 ++++++++++++++++++++ 2 files changed, 59 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/83adcf66/ambari-agent/src/main/python/ambari_agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index 3ec0621..cc10728 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -294,6 +294,7 @@ class ActionQueue(threading.Thread): logger.info("Command execution metadata - taskId = {taskId}, retry enabled = {retryAble}, max retry duration (sec) = {retryDuration}, log_output = {log_command_output}". format(taskId=taskId, retryAble=retryAble, retryDuration=retryDuration, log_command_output=log_command_output)) + command_canceled = False while retryDuration >= 0: numAttempts += 1 start = 0 @@ -322,6 +323,7 @@ class ActionQueue(threading.Thread): status = self.FAILED_STATUS if (commandresult['exitcode'] == -signal.SIGTERM) or (commandresult['exitcode'] == -signal.SIGKILL): logger.info('Command with taskId = {cid} was canceled!'.format(cid=taskId)) + command_canceled = True break if status != self.COMPLETED_STATUS and retryAble and retryDuration > 0: @@ -338,6 +340,15 @@ class ActionQueue(threading.Thread): .format(cid=taskId, status=status, retryAble=retryAble, retryDuration=retryDuration, delay=delay)) break + # do not fail task which was rescheduled from server + if command_canceled: + with self.commandQueue.mutex: + for com in self.commandQueue.queue: + if com['taskId'] == command['taskId']: + logger.info('Command with taskId = {cid} was rescheduled by server. ' + 'Fail report on cancelled command won\'t be sent with heartbeat.'.format(cid=taskId)) + return + # final result to stdout commandresult['stdout'] += '\n\nCommand completed successfully!\n' if status == self.COMPLETED_STATUS else '\n\nCommand failed after ' + str(numAttempts) + ' tries\n' logger.info('Command with taskId = {cid} completed successfully!'.format(cid=taskId) if status == self.COMPLETED_STATUS else 'Command with taskId = {cid} failed after {attempts} tries'.format(cid=taskId, attempts=numAttempts)) http://git-wip-us.apache.org/repos/asf/ambari/blob/83adcf66/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py index 7d04d42..d5dde8b 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py +++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py @@ -27,6 +27,7 @@ import os, errno, time, pprint, tempfile, threading import sys from threading import Thread import copy +import signal from mock.mock import patch, MagicMock, call from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator @@ -703,6 +704,53 @@ class TestActionQueue(TestCase): report = actionQueue.result() self.assertEqual(len(report['reports']), 0) + def test_cancel_with_reschedule_command(self): + config = AmbariConfig() + tempdir = tempfile.gettempdir() + config.set('agent', 'prefix', tempdir) + config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache") + config.set('agent', 'tolerate_download_failures', "true") + dummy_controller = MagicMock() + actionQueue = ActionQueue(config, dummy_controller) + unfreeze_flag = threading.Event() + python_execution_result_dict = { + 'stdout': 'out', + 'stderr': 'stderr', + 'structuredOut' : '', + 'status' : '', + 'exitcode' : -signal.SIGTERM + } + + def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False): + unfreeze_flag.wait() + return python_execution_result_dict + def patched_aq_execute_command(command): + # We have to perform patching for separate thread in the same thread + with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock: + runCommand_mock.side_effect = side_effect + actionQueue.execute_command(command) + + # We call method in a separate thread + execution_thread = Thread(target = patched_aq_execute_command , + args = (self.datanode_install_command, )) + execution_thread.start() + # check in progress report + # wait until ready + while True: + time.sleep(0.1) + report = actionQueue.result() + if len(report['reports']) != 0: + break + + unfreeze_flag.set() + # wait until ready + while len(report['reports']) != 0: + time.sleep(0.1) + report = actionQueue.result() + + # check report + self.assertEqual(len(report['reports']), 0) + @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch.object(CustomServiceOrchestrator, "runCommand")