Repository: ambari Updated Branches: refs/heads/trunk 1a7c781ce -> 995fc0be4
Revert "AMBARI-15795. Parallel execution should only be allowed on commands that have auto retry enabled (smohanty)" This reverts commit 1a7c781ce5340e332dc182c2c93d010cf0eec902. Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/995fc0be Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/995fc0be Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/995fc0be Branch: refs/heads/trunk Commit: 995fc0be488d45bd9b44e412e034565267649a1a Parents: 1a7c781 Author: Sumit Mohanty <[email protected]> Authored: Mon Apr 11 15:48:30 2016 -0700 Committer: Sumit Mohanty <[email protected]> Committed: Mon Apr 11 15:48:30 2016 -0700 ---------------------------------------------------------------------- .../src/main/python/ambari_agent/ActionQueue.py | 22 +- .../test/python/ambari_agent/TestActionQueue.py | 48 +- .../python/ambari_agent/TestActionQueue.py.orig | 1158 ------------------ 3 files changed, 7 insertions(+), 1221 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/995fc0be/ambari-agent/src/main/python/ambari_agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index ccae62c..d603566 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -156,23 +156,11 @@ class ActionQueue(threading.Thread): # commands using separate threads while (True): command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME) - # If command is not retry_enabled then do not start them in parallel - # checking just one command is enough as all commands for a stage is sent - # at the same time and retry is only enabled for initial start/install - retryAble = False - if 'command_retry_enabled' in command['commandParams']: - retryAble = command['commandParams']['command_retry_enabled'] == "true" - if retryAble: - logger.info("Kicking off a thread for the command, id=" + - str(command['commandId']) + " taskId=" + str(command['taskId'])) - t = threading.Thread(target=self.process_command, args=(command,)) - t.daemon = True - t.start() - else: - self.process_command(command) - break; - pass - pass + logger.info("Kicking off a thread for the command, id=" + + str(command['commandId']) + " taskId=" + str(command['taskId'])) + t = threading.Thread(target=self.process_command, args=(command,)) + t.daemon = True + t.start() except (Queue.Empty): pass http://git-wip-us.apache.org/repos/asf/ambari/blob/995fc0be/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py index 2adf4ed..8bd5ddc 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py +++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py @@ -59,26 +59,7 @@ class TestActionQueue(TestCase): 'serviceName': u'HDFS', 'hostLevelParams': {}, 'configurations':{'global' : {}}, - 'configurationTags':{'global' : { 'tag': 'v1' }}, - 'commandParams': { - 'command_retry_enabled': 'true' - } - } - - datanode_install_no_retry_command = { - 'commandType': 'EXECUTION_COMMAND', - 'role': u'DATANODE', - 'roleCommand': u'INSTALL', - 'commandId': '1-1', - 'taskId': 3, - 'clusterName': u'cc', - 'serviceName': u'HDFS', - 'hostLevelParams': {}, - 'configurations':{'global' : {}}, - 'configurationTags':{'global' : { 'tag': 'v1' }}, - 'commandParams': { - 'command_retry_enabled': 'false' - } + 'configurationTags':{'global' : { 'tag': 'v1' }} } datanode_auto_start_command = { @@ -143,11 +124,8 @@ class TestActionQueue(TestCase): 'taskId': 7, 'clusterName': u'cc', 'serviceName': u'HDFS', - 'hostLevelParams': {}, - 'commandParams': { - 'command_retry_enabled': 'true' + 'hostLevelParams': {} } - } status_command = { "serviceName" : 'HDFS', @@ -905,28 +883,6 @@ class TestActionQueue(TestCase): self.assertEqual(2, process_command_mock.call_count) process_command_mock.assert_any_calls([call(self.datanode_install_command), call(self.hbase_install_command)]) - @patch("threading.Thread") - @patch.object(AmbariConfig, "get_parallel_exec_option") - @patch.object(ActionQueue, "process_command") - @patch.object(CustomServiceOrchestrator, "__init__") - def test_parallel_exec_no_retry(self, CustomServiceOrchestrator_mock, - process_command_mock, gpeo_mock, threading_mock): - CustomServiceOrchestrator_mock.return_value = None - dummy_controller = MagicMock() - config = MagicMock() - gpeo_mock.return_value = 1 - config.get_parallel_exec_option = gpeo_mock - actionQueue = ActionQueue(config, dummy_controller) - actionQueue.put([self.datanode_install_no_retry_command, self.snamenode_install_command]) - self.assertEqual(2, actionQueue.commandQueue.qsize()) - actionQueue.start() - time.sleep(1) - actionQueue.stop() - actionQueue.join() - self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.') - self.assertEqual(1, process_command_mock.call_count) - self.assertEqual(0, threading_mock.call_count) - process_command_mock.assert_any_calls([call(self.datanode_install_command), call(self.hbase_install_command)]) @not_for_platform(PLATFORM_LINUX) @patch("time.sleep") http://git-wip-us.apache.org/repos/asf/ambari/blob/995fc0be/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py.orig ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py.orig b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py.orig deleted file mode 100644 index 8bd5ddc..0000000 --- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py.orig +++ /dev/null @@ -1,1158 +0,0 @@ -#!/usr/bin/env python - -''' -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -''' -from Queue import Queue - -from unittest import TestCase -from ambari_agent.LiveStatus import LiveStatus -from ambari_agent.ActionQueue import ActionQueue -from ambari_agent.AmbariConfig import AmbariConfig -import os, errno, time, pprint, tempfile, threading -import sys -from threading import Thread -import copy - -from mock.mock import patch, MagicMock, call -from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator -from ambari_agent.PythonExecutor import PythonExecutor -from ambari_agent.ActualConfigHandler import ActualConfigHandler -from ambari_agent.RecoveryManager import RecoveryManager -from ambari_commons import OSCheck -from only_for_platform import not_for_platform, os_distro_value, PLATFORM_WINDOWS, PLATFORM_LINUX - -import logging - -class TestActionQueue(TestCase): - def setUp(self): - # save original open() method for later use - self.original_open = open - - - def tearDown(self): - sys.stdout = sys.__stdout__ - - logger = logging.getLogger() - - datanode_install_command = { - 'commandType': 'EXECUTION_COMMAND', - 'role': u'DATANODE', - 'roleCommand': u'INSTALL', - 'commandId': '1-1', - 'taskId': 3, - 'clusterName': u'cc', - 'serviceName': u'HDFS', - 'hostLevelParams': {}, - 'configurations':{'global' : {}}, - 'configurationTags':{'global' : { 'tag': 'v1' }} - } - - datanode_auto_start_command = { - 'commandType': 'AUTO_EXECUTION_COMMAND', - 'role': u'DATANODE', - 'roleCommand': u'START', - 'commandId': '1-1', - 'taskId': 3, - 'clusterName': u'cc', - 'serviceName': u'HDFS', - 'hostLevelParams': {}, - 'configurations':{'global' : {}}, - 'configurationTags':{'global' : { 'tag': 'v1' }} - } - - datanode_upgrade_command = { - 'commandId': 17, - 'role' : "role", - 'taskId' : "taskId", - 'clusterName' : "clusterName", - 'serviceName' : "serviceName", - 'roleCommand' : 'UPGRADE', - 'hostname' : "localhost.localdomain", - 'hostLevelParams': {}, - 'clusterHostInfo': "clusterHostInfo", - 'commandType': "EXECUTION_COMMAND", - 'configurations':{'global' : {}}, - 'roleParams': {}, - 'commandParams' : { - 'source_stack_version' : 'HDP-1.2.1', - 'target_stack_version' : 'HDP-1.3.0' - } - } - - namenode_install_command = { - 'commandType': 'EXECUTION_COMMAND', - 'role': u'NAMENODE', - 'roleCommand': u'INSTALL', - 'commandId': '1-1', - 'taskId': 4, - 'clusterName': u'cc', - 'serviceName': u'HDFS', - 'hostLevelParams': {} - } - - snamenode_install_command = { - 'commandType': 'EXECUTION_COMMAND', - 'role': u'SECONDARY_NAMENODE', - 'roleCommand': u'INSTALL', - 'commandId': '1-1', - 'taskId': 5, - 'clusterName': u'cc', - 'serviceName': u'HDFS', - 'hostLevelParams': {} - } - - hbase_install_command = { - 'commandType': 'EXECUTION_COMMAND', - 'role': u'HBASE', - 'roleCommand': u'INSTALL', - 'commandId': '1-1', - 'taskId': 7, - 'clusterName': u'cc', - 'serviceName': u'HDFS', - 'hostLevelParams': {} - } - - status_command = { - "serviceName" : 'HDFS', - "commandType" : "STATUS_COMMAND", - "clusterName" : "", - "componentName" : "DATANODE", - 'configurations':{}, - 'hostLevelParams': {} - } - - datanode_restart_command = { - 'commandType': 'EXECUTION_COMMAND', - 'role': u'DATANODE', - 'roleCommand': u'CUSTOM_COMMAND', - 'commandId': '1-1', - 'taskId': 9, - 'clusterName': u'cc', - 'serviceName': u'HDFS', - 'configurations':{'global' : {}}, - 'configurationTags':{'global' : { 'tag': 'v123' }}, - 'hostLevelParams':{'custom_command': 'RESTART', 'clientsToUpdateConfigs': []} - } - - datanode_restart_command_no_clients_update = { - 'commandType': 'EXECUTION_COMMAND', - 'role': u'DATANODE', - 'roleCommand': u'CUSTOM_COMMAND', - 'commandId': '1-1', - 'taskId': 9, - 'clusterName': u'cc', - 'serviceName': u'HDFS', - 'configurations':{'global' : {}}, - 'configurationTags':{'global' : { 'tag': 'v123' }}, - 'hostLevelParams':{'custom_command': 'RESTART'} - } - - status_command_for_alerts = { - "serviceName" : 'FLUME', - "commandType" : "STATUS_COMMAND", - "clusterName" : "", - "componentName" : "FLUME_HANDLER", - 'configurations':{}, - 'hostLevelParams': {} - } - - retryable_command = { - 'commandType': 'EXECUTION_COMMAND', - 'role': 'NAMENODE', - 'roleCommand': 'INSTALL', - 'commandId': '1-1', - 'taskId': 19, - 'clusterName': 'c1', - 'serviceName': 'HDFS', - 'configurations':{'global' : {}}, - 'configurationTags':{'global' : { 'tag': 'v123' }}, - 'commandParams' : { - 'script_type' : 'PYTHON', - 'script' : 'script.py', - 'command_timeout' : '600', - 'jdk_location' : '.', - 'service_package_folder' : '.', - 'command_retry_enabled' : 'true', - 'max_duration_for_retries' : '5' - }, - 'hostLevelParams' : {} - } - - background_command = { - 'commandType': 'BACKGROUND_EXECUTION_COMMAND', - 'role': 'NAMENODE', - 'roleCommand': 'CUSTOM_COMMAND', - 'commandId': '1-1', - 'taskId': 19, - 'clusterName': 'c1', - 'serviceName': 'HDFS', - 'configurations':{'global' : {}}, - 'configurationTags':{'global' : { 'tag': 'v123' }}, - 'hostLevelParams':{'custom_command': 'REBALANCE_HDFS'}, - 'commandParams' : { - 'script_type' : 'PYTHON', - 'script' : 'script.py', - 'command_timeout' : '600', - 'jdk_location' : '.', - 'service_package_folder' : '.' - } - } - cancel_background_command = { - 'commandType': 'EXECUTION_COMMAND', - 'role': 'NAMENODE', - 'roleCommand': 'ACTIONEXECUTE', - 'commandId': '1-1', - 'taskId': 20, - 'clusterName': 'c1', - 'serviceName': 'HDFS', - 'configurations':{'global' : {}}, - 'configurationTags':{'global' : {}}, - 'hostLevelParams':{}, - 'commandParams' : { - 'script_type' : 'PYTHON', - 'script' : 'cancel_background_task.py', - 'before_system_hook_function' : 'fetch_bg_pid_by_taskid', - 'jdk_location' : '.', - 'command_timeout' : '600', - 'service_package_folder' : '.', - 'cancel_policy': 'SIGKILL', - 'cancel_task_id': "19", - } - } - - - @patch.object(AmbariConfig, "get_parallel_exec_option") - @patch.object(ActionQueue, "process_command") - @patch.object(Queue, "get") - @patch.object(CustomServiceOrchestrator, "__init__") - def test_ActionQueueStartStop(self, CustomServiceOrchestrator_mock, - get_mock, process_command_mock, get_parallel_exec_option_mock): - CustomServiceOrchestrator_mock.return_value = None - dummy_controller = MagicMock() - config = MagicMock() - get_parallel_exec_option_mock.return_value = 0 - config.get_parallel_exec_option = get_parallel_exec_option_mock - actionQueue = ActionQueue(config, dummy_controller) - actionQueue.start() - time.sleep(0.1) - actionQueue.stop() - actionQueue.join() - self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.') - self.assertTrue(process_command_mock.call_count > 1) - - - @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) - @patch("traceback.print_exc") - @patch.object(ActionQueue, "execute_command") - @patch.object(ActionQueue, "execute_status_command") - def test_process_command(self, execute_status_command_mock, - execute_command_mock, print_exc_mock): - dummy_controller = MagicMock() - config = AmbariConfig() - config.set('agent', 'tolerate_download_failures', "true") - actionQueue = ActionQueue(config, dummy_controller) - execution_command = { - 'commandType' : ActionQueue.EXECUTION_COMMAND, - } - status_command = { - 'commandType' : ActionQueue.STATUS_COMMAND, - } - wrong_command = { - 'commandType' : "SOME_WRONG_COMMAND", - } - # Try wrong command - actionQueue.process_command(wrong_command) - self.assertFalse(execute_command_mock.called) - self.assertFalse(execute_status_command_mock.called) - self.assertFalse(print_exc_mock.called) - - execute_command_mock.reset_mock() - execute_status_command_mock.reset_mock() - print_exc_mock.reset_mock() - # Try normal execution - actionQueue.process_command(execution_command) - self.assertTrue(execute_command_mock.called) - self.assertFalse(execute_status_command_mock.called) - self.assertFalse(print_exc_mock.called) - - execute_command_mock.reset_mock() - execute_status_command_mock.reset_mock() - print_exc_mock.reset_mock() - - actionQueue.process_command(status_command) - self.assertFalse(execute_command_mock.called) - self.assertTrue(execute_status_command_mock.called) - self.assertFalse(print_exc_mock.called) - - execute_command_mock.reset_mock() - execute_status_command_mock.reset_mock() - print_exc_mock.reset_mock() - - # Try exception to check proper logging - def side_effect(self): - raise Exception("TerribleException") - execute_command_mock.side_effect = side_effect - actionQueue.process_command(execution_command) - self.assertTrue(print_exc_mock.called) - - print_exc_mock.reset_mock() - - execute_status_command_mock.side_effect = side_effect - actionQueue.process_command(execution_command) - self.assertTrue(print_exc_mock.called) - - @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) - @patch.object(CustomServiceOrchestrator, "runCommand") - @patch("CommandStatusDict.CommandStatusDict") - @patch.object(ActionQueue, "status_update_callback") - def test_log_execution_commands(self, status_update_callback_mock, - command_status_dict_mock, - cso_runCommand_mock): - custom_service_orchestrator_execution_result_dict = { - 'stdout': 'out', - 'stderr': 'stderr', - 'structuredOut' : '', - 'exitcode' : 0 - } - cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict - - config = AmbariConfig() - tempdir = tempfile.gettempdir() - config.set('agent', 'prefix', tempdir) - config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache") - config.set('agent', 'tolerate_download_failures', "true") - config.set('logging', 'log_command_executes', 1) - dummy_controller = MagicMock() - actionQueue = ActionQueue(config, dummy_controller) - actionQueue.execute_command(self.datanode_restart_command) - report = actionQueue.result() - expected = {'status': 'COMPLETED', - 'configurationTags': {'global': {'tag': 'v123'}}, - 'stderr': 'stderr', - 'stdout': 'out', - 'clusterName': u'cc', - 'structuredOut': '""', - 'roleCommand': u'CUSTOM_COMMAND', - 'serviceName': u'HDFS', - 'role': u'DATANODE', - 'actionId': '1-1', - 'taskId': 9, - 'customCommand': 'RESTART', - 'exitCode': 0} - # Agent caches configurationTags if custom_command RESTART completed - self.assertEqual(len(report['reports']), 1) - self.assertEqual(expected, report['reports'][0]) - - - @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) - @patch("__builtin__.open") - @patch.object(ActionQueue, "status_update_callback") - def test_auto_execute_command(self, status_update_callback_mock, open_mock): - # Make file read calls visible - def open_side_effect(file, mode): - if mode == 'r': - file_mock = MagicMock() - file_mock.read.return_value = "Read from " + str(file) - return file_mock - else: - return self.original_open(file, mode) - open_mock.side_effect = open_side_effect - - config = AmbariConfig() - tempdir = tempfile.gettempdir() - config.set('agent', 'prefix', tempdir) - config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache") - config.set('agent', 'tolerate_download_failures', "true") - dummy_controller = MagicMock() - dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp()) - dummy_controller.recovery_manager.update_config(5, 5, 1, 11, True, False, "", -1) - - actionQueue = ActionQueue(config, dummy_controller) - unfreeze_flag = threading.Event() - python_execution_result_dict = { - 'stdout': 'out', - 'stderr': 'stderr', - 'structuredOut' : '' - } - - def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False): - unfreeze_flag.wait() - return python_execution_result_dict - def patched_aq_execute_command(command): - # We have to perform patching for separate thread in the same thread - with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock: - runCommand_mock.side_effect = side_effect - actionQueue.process_command(command) - - python_execution_result_dict['status'] = 'COMPLETE' - python_execution_result_dict['exitcode'] = 0 - self.assertFalse(actionQueue.tasks_in_progress_or_pending()) - # We call method in a separate thread - execution_thread = Thread(target = patched_aq_execute_command , - args = (self.datanode_auto_start_command, )) - execution_thread.start() - # check in progress report - # wait until ready - while True: - time.sleep(0.1) - if actionQueue.tasks_in_progress_or_pending(): - break - # Continue command execution - unfreeze_flag.set() - # wait until ready - check_queue = True - while check_queue: - report = actionQueue.result() - if not actionQueue.tasks_in_progress_or_pending(): - break - time.sleep(0.1) - - self.assertEqual(len(report['reports']), 0) - - ## Test failed execution - python_execution_result_dict['status'] = 'FAILED' - python_execution_result_dict['exitcode'] = 13 - # We call method in a separate thread - execution_thread = Thread(target = patched_aq_execute_command , - args = (self.datanode_auto_start_command, )) - execution_thread.start() - unfreeze_flag.set() - # check in progress report - # wait until ready - while check_queue: - report = actionQueue.result() - if not actionQueue.tasks_in_progress_or_pending(): - break - time.sleep(0.1) - - self.assertEqual(len(report['reports']), 0) - - @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) - @patch("__builtin__.open") - @patch.object(ActionQueue, "status_update_callback") - def test_execute_command(self, status_update_callback_mock, open_mock): - # Make file read calls visible - def open_side_effect(file, mode): - if mode == 'r': - file_mock = MagicMock() - file_mock.read.return_value = "Read from " + str(file) - return file_mock - else: - return self.original_open(file, mode) - open_mock.side_effect = open_side_effect - - config = AmbariConfig() - tempdir = tempfile.gettempdir() - config.set('agent', 'prefix', tempdir) - config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache") - config.set('agent', 'tolerate_download_failures', "true") - dummy_controller = MagicMock() - actionQueue = ActionQueue(config, dummy_controller) - unfreeze_flag = threading.Event() - python_execution_result_dict = { - 'stdout': 'out', - 'stderr': 'stderr', - 'structuredOut' : '' - } - - def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False): - unfreeze_flag.wait() - return python_execution_result_dict - def patched_aq_execute_command(command): - # We have to perform patching for separate thread in the same thread - with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock: - runCommand_mock.side_effect = side_effect - actionQueue.execute_command(command) - ### Test install/start/stop command ### - ## Test successful execution with configuration tags - python_execution_result_dict['status'] = 'COMPLETE' - python_execution_result_dict['exitcode'] = 0 - # We call method in a separate thread - execution_thread = Thread(target = patched_aq_execute_command , - args = (self.datanode_install_command, )) - execution_thread.start() - # check in progress report - # wait until ready - while True: - time.sleep(0.1) - report = actionQueue.result() - if len(report['reports']) != 0: - break - expected = {'status': 'IN_PROGRESS', - 'stderr': 'Read from {0}'.format(os.path.join(tempdir, "errors-3.txt")), - 'stdout': 'Read from {0}'.format(os.path.join(tempdir, "output-3.txt")), - 'structuredOut' : 'Read from {0}'.format(os.path.join(tempdir, "structured-out-3.json")), - 'clusterName': u'cc', - 'roleCommand': u'INSTALL', - 'serviceName': u'HDFS', - 'role': u'DATANODE', - 'actionId': '1-1', - 'taskId': 3, - 'exitCode': 777} - self.assertEqual(report['reports'][0], expected) - self.assertTrue(actionQueue.tasks_in_progress_or_pending()) - - # Continue command execution - unfreeze_flag.set() - # wait until ready - while report['reports'][0]['status'] == 'IN_PROGRESS': - time.sleep(0.1) - report = actionQueue.result() - # check report - configname = os.path.join(tempdir, 'config.json') - expected = {'status': 'COMPLETED', - 'stderr': 'stderr', - 'stdout': 'out', - 'clusterName': u'cc', - 'structuredOut': '""', - 'roleCommand': u'INSTALL', - 'serviceName': u'HDFS', - 'role': u'DATANODE', - 'actionId': '1-1', - 'taskId': 3, - 'configurationTags': {'global': {'tag': 'v1'}}, - 'exitCode': 0} - self.assertEqual(len(report['reports']), 1) - self.assertEqual(report['reports'][0], expected) - self.assertTrue(os.path.isfile(configname)) - # Check that we had 2 status update calls ( IN_PROGRESS and COMPLETE) - self.assertEqual(status_update_callback_mock.call_count, 2) - os.remove(configname) - - # now should not have reports (read complete/failed reports are deleted) - report = actionQueue.result() - self.assertEqual(len(report['reports']), 0) - - ## Test failed execution - python_execution_result_dict['status'] = 'FAILED' - python_execution_result_dict['exitcode'] = 13 - # We call method in a separate thread - execution_thread = Thread(target = patched_aq_execute_command , - args = (self.datanode_install_command, )) - execution_thread.start() - unfreeze_flag.set() - # check in progress report - # wait until ready - report = actionQueue.result() - while len(report['reports']) == 0 or \ - report['reports'][0]['status'] == 'IN_PROGRESS': - time.sleep(0.1) - report = actionQueue.result() - # check report - expected = {'status': 'FAILED', - 'stderr': 'stderr', - 'stdout': 'out', - 'clusterName': u'cc', - 'structuredOut': '""', - 'roleCommand': u'INSTALL', - 'serviceName': u'HDFS', - 'role': u'DATANODE', - 'actionId': '1-1', - 'taskId': 3, - 'exitCode': 13} - self.assertEqual(len(report['reports']), 1) - self.assertEqual(report['reports'][0], expected) - - # now should not have reports (read complete/failed reports are deleted) - report = actionQueue.result() - self.assertEqual(len(report['reports']), 0) - - ### Test upgrade command ### - python_execution_result_dict['status'] = 'COMPLETE' - python_execution_result_dict['exitcode'] = 0 - execution_thread = Thread(target = patched_aq_execute_command , - args = (self.datanode_upgrade_command, )) - execution_thread.start() - unfreeze_flag.set() - # wait until ready - report = actionQueue.result() - while len(report['reports']) == 0 or \ - report['reports'][0]['status'] == 'IN_PROGRESS': - time.sleep(0.1) - report = actionQueue.result() - # check report - expected = {'status': 'COMPLETED', - 'stderr': 'stderr', - 'stdout': 'out', - 'clusterName': 'clusterName', - 'structuredOut': '""', - 'roleCommand': 'UPGRADE', - 'serviceName': 'serviceName', - 'role': 'role', - 'actionId': 17, - 'taskId': 'taskId', - 'exitCode': 0} - self.assertEqual(len(report['reports']), 1) - self.assertEqual(report['reports'][0], expected) - - # now should not have reports (read complete/failed reports are deleted) - report = actionQueue.result() - self.assertEqual(len(report['reports']), 0) - - - @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) - @patch.object(CustomServiceOrchestrator, "runCommand") - @patch("CommandStatusDict.CommandStatusDict") - @patch.object(ActionQueue, "status_update_callback") - def test_store_configuration_tags(self, status_update_callback_mock, - command_status_dict_mock, - cso_runCommand_mock): - custom_service_orchestrator_execution_result_dict = { - 'stdout': 'out', - 'stderr': 'stderr', - 'structuredOut' : '', - 'exitcode' : 0 - } - cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict - - config = AmbariConfig() - tempdir = tempfile.gettempdir() - config.set('agent', 'prefix', tempdir) - config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache") - config.set('agent', 'tolerate_download_failures', "true") - dummy_controller = MagicMock() - actionQueue = ActionQueue(config, dummy_controller) - actionQueue.execute_command(self.datanode_restart_command) - report = actionQueue.result() - expected = {'status': 'COMPLETED', - 'configurationTags': {'global': {'tag': 'v123'}}, - 'stderr': 'stderr', - 'stdout': 'out', - 'clusterName': u'cc', - 'structuredOut': '""', - 'roleCommand': u'CUSTOM_COMMAND', - 'serviceName': u'HDFS', - 'role': u'DATANODE', - 'actionId': '1-1', - 'taskId': 9, - 'customCommand': 'RESTART', - 'exitCode': 0} - # Agent caches configurationTags if custom_command RESTART completed - self.assertEqual(len(report['reports']), 1) - self.assertEqual(expected, report['reports'][0]) - - @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) - @patch.object(ActualConfigHandler, "write_client_components") - @patch.object(CustomServiceOrchestrator, "runCommand") - @patch("CommandStatusDict.CommandStatusDict") - @patch.object(ActionQueue, "status_update_callback") - def test_store_configuration_tags_no_clients(self, status_update_callback_mock, - command_status_dict_mock, - cso_runCommand_mock, write_client_components_mock): - custom_service_orchestrator_execution_result_dict = { - 'stdout': 'out', - 'stderr': 'stderr', - 'structuredOut' : '', - 'exitcode' : 0 - } - cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict - - config = AmbariConfig() - tempdir = tempfile.gettempdir() - config.set('agent', 'prefix', tempdir) - config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache") - config.set('agent', 'tolerate_download_failures', "true") - dummy_controller = MagicMock() - actionQueue = ActionQueue(config, dummy_controller) - actionQueue.execute_command(self.datanode_restart_command_no_clients_update) - report = actionQueue.result() - expected = {'status': 'COMPLETED', - 'configurationTags': {'global': {'tag': 'v123'}}, - 'stderr': 'stderr', - 'stdout': 'out', - 'clusterName': u'cc', - 'structuredOut': '""', - 'roleCommand': u'CUSTOM_COMMAND', - 'serviceName': u'HDFS', - 'role': u'DATANODE', - 'actionId': '1-1', - 'taskId': 9, - 'customCommand': 'RESTART', - 'exitCode': 0} - # Agent caches configurationTags if custom_command RESTART completed - self.assertEqual(len(report['reports']), 1) - self.assertEqual(expected, report['reports'][0]) - self.assertFalse(write_client_components_mock.called) - - @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) - @patch.object(ActionQueue, "status_update_callback") - @patch.object(CustomServiceOrchestrator, "requestComponentStatus") - @patch.object(CustomServiceOrchestrator, "requestComponentSecurityState") - @patch.object(ActionQueue, "execute_command") - @patch.object(LiveStatus, "build") - @patch.object(CustomServiceOrchestrator, "__init__") - def test_execute_status_command(self, CustomServiceOrchestrator_mock, - build_mock, execute_command_mock, requestComponentSecurityState_mock, - requestComponentStatus_mock, - status_update_callback): - CustomServiceOrchestrator_mock.return_value = None - dummy_controller = MagicMock() - actionQueue = ActionQueue(AmbariConfig(), dummy_controller) - - build_mock.return_value = {'dummy report': '' } - - dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp()) - - requestComponentStatus_mock.reset_mock() - requestComponentStatus_mock.return_value = {'exitcode': 0 } - - requestComponentSecurityState_mock.reset_mock() - requestComponentSecurityState_mock.return_value = 'UNKNOWN' - - actionQueue.execute_status_command(self.status_command) - report = actionQueue.result() - expected = {'dummy report': '', - 'securityState' : 'UNKNOWN'} - - self.assertEqual(len(report['componentStatus']), 1) - self.assertEqual(report['componentStatus'][0], expected) - self.assertTrue(requestComponentStatus_mock.called) - - @patch.object(RecoveryManager, "command_exists") - @patch.object(RecoveryManager, "requires_recovery") - @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) - @patch.object(ActionQueue, "status_update_callback") - @patch.object(CustomServiceOrchestrator, "requestComponentStatus") - @patch.object(CustomServiceOrchestrator, "requestComponentSecurityState") - @patch.object(ActionQueue, "execute_command") - @patch.object(LiveStatus, "build") - @patch.object(CustomServiceOrchestrator, "__init__") - def test_execute_status_command_recovery(self, CustomServiceOrchestrator_mock, - build_mock, execute_command_mock, requestComponentSecurityState_mock, - requestComponentStatus_mock, - status_update_callback, requires_recovery_mock, - command_exists_mock): - CustomServiceOrchestrator_mock.return_value = None - dummy_controller = MagicMock() - actionQueue = ActionQueue(AmbariConfig(), dummy_controller) - - build_mock.return_value = {'dummy report': '' } - requires_recovery_mock.return_value = True - command_exists_mock.return_value = False - - dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp(), True, False) - - requestComponentStatus_mock.reset_mock() - requestComponentStatus_mock.return_value = {'exitcode': 0 } - - requestComponentSecurityState_mock.reset_mock() - requestComponentSecurityState_mock.return_value = 'UNKNOWN' - - actionQueue.execute_status_command(self.status_command) - report = actionQueue.result() - expected = {'dummy report': '', - 'securityState' : 'UNKNOWN', - 'sendExecCmdDet': 'True'} - - self.assertEqual(len(report['componentStatus']), 1) - self.assertEqual(report['componentStatus'][0], expected) - self.assertTrue(requestComponentStatus_mock.called) - - requires_recovery_mock.return_value = True - command_exists_mock.return_value = True - requestComponentStatus_mock.reset_mock() - requestComponentStatus_mock.return_value = {'exitcode': 0 } - - requestComponentSecurityState_mock.reset_mock() - requestComponentSecurityState_mock.return_value = 'UNKNOWN' - - actionQueue.execute_status_command(self.status_command) - report = actionQueue.result() - expected = {'dummy report': '', - 'securityState' : 'UNKNOWN', - 'sendExecCmdDet': 'False'} - - self.assertEqual(len(report['componentStatus']), 1) - self.assertEqual(report['componentStatus'][0], expected) - self.assertTrue(requestComponentStatus_mock.called) - - @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) - @patch.object(ActionQueue, "status_update_callback") - @patch.object(CustomServiceOrchestrator, "requestComponentStatus") - @patch.object(CustomServiceOrchestrator, "requestComponentSecurityState") - @patch.object(ActionQueue, "execute_command") - @patch.object(LiveStatus, "build") - @patch.object(CustomServiceOrchestrator, "__init__") - def test_execute_status_command_with_alerts(self, CustomServiceOrchestrator_mock, - requestComponentSecurityState_mock, - build_mock, execute_command_mock, - requestComponentStatus_mock, - status_update_callback): - CustomServiceOrchestrator_mock.return_value = None - dummy_controller = MagicMock() - actionQueue = ActionQueue(AmbariConfig(), dummy_controller) - - - requestComponentStatus_mock.reset_mock() - requestComponentStatus_mock.return_value = { - 'exitcode': 0, - 'stdout': 'out', - 'stderr': 'err', - 'structuredOut': {'alerts': [ {'name': 'flume_alert'} ] } - } - build_mock.return_value = {'somestatusresult': 'aresult'} - - actionQueue.execute_status_command(self.status_command_for_alerts) - - report = actionQueue.result() - - self.assertTrue(requestComponentStatus_mock.called) - self.assertEqual(len(report['componentStatus']), 1) - self.assertTrue(report['componentStatus'][0].has_key('alerts')) - - @patch.object(AmbariConfig, "get_parallel_exec_option") - @patch.object(ActionQueue, "process_command") - @patch.object(Queue, "get") - @patch.object(CustomServiceOrchestrator, "__init__") - def test_reset_queue(self, CustomServiceOrchestrator_mock, - get_mock, process_command_mock, gpeo_mock): - CustomServiceOrchestrator_mock.return_value = None - dummy_controller = MagicMock() - dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp()) - config = MagicMock() - gpeo_mock.return_value = 0 - config.get_parallel_exec_option = gpeo_mock - actionQueue = ActionQueue(config, dummy_controller) - actionQueue.start() - actionQueue.put([self.datanode_install_command, self.hbase_install_command]) - self.assertEqual(2, actionQueue.commandQueue.qsize()) - self.assertTrue(actionQueue.tasks_in_progress_or_pending()) - actionQueue.reset() - self.assertTrue(actionQueue.commandQueue.empty()) - self.assertFalse(actionQueue.tasks_in_progress_or_pending()) - time.sleep(0.1) - actionQueue.stop() - actionQueue.join() - self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.') - - @patch.object(AmbariConfig, "get_parallel_exec_option") - @patch.object(ActionQueue, "process_command") - @patch.object(Queue, "get") - @patch.object(CustomServiceOrchestrator, "__init__") - def test_cancel(self, CustomServiceOrchestrator_mock, - get_mock, process_command_mock, gpeo_mock): - CustomServiceOrchestrator_mock.return_value = None - dummy_controller = MagicMock() - config = MagicMock() - gpeo_mock.return_value = 0 - config.get_parallel_exec_option = gpeo_mock - actionQueue = ActionQueue(config, dummy_controller) - actionQueue.start() - actionQueue.put([self.datanode_install_command, self.hbase_install_command]) - self.assertEqual(2, actionQueue.commandQueue.qsize()) - actionQueue.reset() - self.assertTrue(actionQueue.commandQueue.empty()) - time.sleep(0.1) - actionQueue.stop() - actionQueue.join() - self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.') - - @patch.object(AmbariConfig, "get_parallel_exec_option") - @patch.object(ActionQueue, "process_command") - @patch.object(CustomServiceOrchestrator, "__init__") - def test_parallel_exec(self, CustomServiceOrchestrator_mock, - process_command_mock, gpeo_mock): - CustomServiceOrchestrator_mock.return_value = None - dummy_controller = MagicMock() - config = MagicMock() - gpeo_mock.return_value = 1 - config.get_parallel_exec_option = gpeo_mock - actionQueue = ActionQueue(config, dummy_controller) - actionQueue.put([self.datanode_install_command, self.hbase_install_command]) - self.assertEqual(2, actionQueue.commandQueue.qsize()) - actionQueue.start() - time.sleep(1) - actionQueue.stop() - actionQueue.join() - self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.') - self.assertEqual(2, process_command_mock.call_count) - process_command_mock.assert_any_calls([call(self.datanode_install_command), call(self.hbase_install_command)]) - - - @not_for_platform(PLATFORM_LINUX) - @patch("time.sleep") - @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value)) - @patch.object(CustomServiceOrchestrator, "__init__") - def test_execute_retryable_command(self, CustomServiceOrchestrator_mock, - sleep_mock - ): - CustomServiceOrchestrator_mock.return_value = None - dummy_controller = MagicMock() - actionQueue = ActionQueue(AmbariConfig(), dummy_controller) - python_execution_result_dict = { - 'exitcode': 1, - 'stdout': 'out', - 'stderr': 'stderr', - 'structuredOut': '', - 'status': 'FAILED' - } - - def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False): - return python_execution_result_dict - - command = copy.deepcopy(self.retryable_command) - with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock: - runCommand_mock.side_effect = side_effect - actionQueue.execute_command(command) - - #assert that python executor start - self.assertTrue(runCommand_mock.called) - self.assertEqual(3, runCommand_mock.call_count) - self.assertEqual(2, sleep_mock.call_count) - sleep_mock.assert_has_calls([call(2), call(3)], False) - runCommand_mock.assert_has_calls([ - call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt', - os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=True, retry=False), - call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt', - os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=False, retry=True), - call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt', - os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=False, retry=True)]) - - - @patch("time.time") - @patch("time.sleep") - @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value)) - @patch.object(CustomServiceOrchestrator, "__init__") - def test_execute_retryable_command_with_time_lapse(self, CustomServiceOrchestrator_mock, - sleep_mock, time_mock - ): - CustomServiceOrchestrator_mock.return_value = None - dummy_controller = MagicMock() - actionQueue = ActionQueue(AmbariConfig(), dummy_controller) - python_execution_result_dict = { - 'exitcode': 1, - 'stdout': 'out', - 'stderr': 'stderr', - 'structuredOut': '', - 'status': 'FAILED' - } - - times_arr = [8, 10, 14, 18, 22] - if self.logger.isEnabledFor(logging.INFO): - times_arr.insert(0, 4) - time_mock.side_effect = times_arr - - def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False): - return python_execution_result_dict - - command = copy.deepcopy(self.retryable_command) - with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock: - runCommand_mock.side_effect = side_effect - actionQueue.execute_command(command) - - #assert that python executor start - self.assertTrue(runCommand_mock.called) - self.assertEqual(2, runCommand_mock.call_count) - self.assertEqual(1, sleep_mock.call_count) - sleep_mock.assert_has_calls([call(2)], False) - runCommand_mock.assert_has_calls([ - call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt', - os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=True, retry=False), - call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt', - os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=False, retry=True)]) - - #retryable_command - @not_for_platform(PLATFORM_LINUX) - @patch("time.sleep") - @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value)) - @patch.object(CustomServiceOrchestrator, "__init__") - def test_execute_retryable_command_fail_and_succeed(self, CustomServiceOrchestrator_mock, - sleep_mock - ): - CustomServiceOrchestrator_mock.return_value = None - dummy_controller = MagicMock() - actionQueue = ActionQueue(AmbariConfig(), dummy_controller) - execution_result_fail_dict = { - 'exitcode': 1, - 'stdout': 'out', - 'stderr': 'stderr', - 'structuredOut': '', - 'status': 'FAILED' - } - execution_result_succ_dict = { - 'exitcode': 0, - 'stdout': 'out', - 'stderr': 'stderr', - 'structuredOut': '', - 'status': 'COMPLETED' - } - - command = copy.deepcopy(self.retryable_command) - with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock: - runCommand_mock.side_effect = [execution_result_fail_dict, execution_result_succ_dict] - actionQueue.execute_command(command) - - #assert that python executor start - self.assertTrue(runCommand_mock.called) - self.assertEqual(2, runCommand_mock.call_count) - self.assertEqual(1, sleep_mock.call_count) - sleep_mock.assert_any_call(2) - - @not_for_platform(PLATFORM_LINUX) - @patch("time.sleep") - @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value)) - @patch.object(CustomServiceOrchestrator, "__init__") - def test_execute_retryable_command_succeed(self, CustomServiceOrchestrator_mock, - sleep_mock - ): - CustomServiceOrchestrator_mock.return_value = None - dummy_controller = MagicMock() - actionQueue = ActionQueue(AmbariConfig(), dummy_controller) - execution_result_succ_dict = { - 'exitcode': 0, - 'stdout': 'out', - 'stderr': 'stderr', - 'structuredOut': '', - 'status': 'COMPLETED' - } - - command = copy.deepcopy(self.retryable_command) - with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock: - runCommand_mock.side_effect = [execution_result_succ_dict] - actionQueue.execute_command(command) - - #assert that python executor start - self.assertTrue(runCommand_mock.called) - self.assertFalse(sleep_mock.called) - self.assertEqual(1, runCommand_mock.call_count) - - @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) - @patch.object(CustomServiceOrchestrator, "runCommand") - @patch.object(CustomServiceOrchestrator, "__init__") - def test_execute_background_command(self, CustomServiceOrchestrator_mock, - runCommand_mock, - ): - CustomServiceOrchestrator_mock.return_value = None - CustomServiceOrchestrator.runCommand.return_value = {'exitcode' : 0, - 'stdout': 'out-11', - 'stderr' : 'err-13'} - - dummy_controller = MagicMock() - actionQueue = ActionQueue(AmbariConfig(), dummy_controller) - - execute_command = copy.deepcopy(self.background_command) - actionQueue.put([execute_command]) - actionQueue.processBackgroundQueueSafeEmpty(); - actionQueue.processStatusCommandQueueSafeEmpty(); - - #assert that python execturor start - self.assertTrue(runCommand_mock.called) - runningCommand = actionQueue.commandStatuses.current_state.get(execute_command['taskId']) - self.assertTrue(runningCommand is not None) - self.assertEqual(runningCommand[1]['status'], ActionQueue.IN_PROGRESS_STATUS) - - report = actionQueue.result() - self.assertEqual(len(report['reports']),1) - - @patch.object(CustomServiceOrchestrator, "get_py_executor") - @patch.object(CustomServiceOrchestrator, "resolve_script_path") - def test_execute_python_executor(self, resolve_script_path_mock, - get_py_executor_mock): - - dummy_controller = MagicMock() - cfg = AmbariConfig() - cfg.set('agent', 'tolerate_download_failures', 'true') - cfg.set('agent', 'prefix', '.') - cfg.set('agent', 'cache_dir', 'background_tasks') - - actionQueue = ActionQueue(cfg, dummy_controller) - pyex = PythonExecutor(actionQueue.customServiceOrchestrator.tmp_dir, actionQueue.customServiceOrchestrator.config) - patch_output_file(pyex) - get_py_executor_mock.return_value = pyex - actionQueue.customServiceOrchestrator.dump_command_to_json = MagicMock() - - result = {} - lock = threading.RLock() - complete_done = threading.Condition(lock) - - def command_complete_w(process_condensed_result, handle): - with lock: - result['command_complete'] = {'condensed_result' : copy.copy(process_condensed_result), - 'handle' : copy.copy(handle), - 'command_status' : actionQueue.commandStatuses.get_command_status(handle.command['taskId']) - } - complete_done.notifyAll() - - actionQueue.on_background_command_complete_callback = wraped(actionQueue.on_background_command_complete_callback, - None, command_complete_w) - actionQueue.put([self.background_command]) - actionQueue.processBackgroundQueueSafeEmpty(); - actionQueue.processStatusCommandQueueSafeEmpty(); - - with lock: - complete_done.wait(0.1) - - finished_status = result['command_complete']['command_status'] - self.assertEqual(finished_status['status'], ActionQueue.COMPLETED_STATUS) - self.assertEqual(finished_status['stdout'], 'process_out') - self.assertEqual(finished_status['stderr'], 'process_err') - self.assertEqual(finished_status['exitCode'], 0) - - - runningCommand = actionQueue.commandStatuses.current_state.get(self.background_command['taskId']) - self.assertTrue(runningCommand is not None) - - report = actionQueue.result() - self.assertEqual(len(report['reports']),1) - self.assertEqual(report['reports'][0]['stdout'],'process_out') -# self.assertEqual(report['reports'][0]['structuredOut'],'{"a": "b."}') - - - - cancel_background_command = { - "commandType":"CANCEL_COMMAND", - "role":"AMBARI_SERVER_ACTION", - "roleCommand":"ABORT", - "commandId":"2--1", - "taskId":20, - "clusterName":"c1", - "serviceName":"", - "hostname":"c6401", - "roleParams":{ - "cancelTaskIdTargets":"13,14" - }, - } - -def patch_output_file(pythonExecutor): - def windows_py(command, tmpout, tmperr): - proc = MagicMock() - proc.pid = 33 - proc.returncode = 0 - with tmpout: - tmpout.write('process_out') - with tmperr: - tmperr.write('process_err') - return proc - def open_subprocess_files_win(fout, ferr, f): - return MagicMock(), MagicMock() - def read_result_from_files(out_path, err_path, structured_out_path): - return 'process_out', 'process_err', '{"a": "b."}' - pythonExecutor.launch_python_subprocess = windows_py - pythonExecutor.open_subprocess_files = open_subprocess_files_win - pythonExecutor.read_result_from_files = read_result_from_files - -def wraped(func, before = None, after = None): - def wrapper(*args, **kwargs): - if(before is not None): - before(*args, **kwargs) - ret = func(*args, **kwargs) - if(after is not None): - after(*args, **kwargs) - return ret - return wrapper -
