Updated Branches: refs/heads/trunk 78e3d4142 -> 489a193dd
AMBARI-3490. Remove RCO management logic at ambari-agent (dlysnichencko) Project: http://git-wip-us.apache.org/repos/asf/incubator-ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ambari/commit/489a193d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ambari/tree/489a193d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ambari/diff/489a193d Branch: refs/heads/trunk Commit: 489a193dd2f54215f8b6f9dc9890032ca746b301 Parents: 78e3d41 Author: Lisnichenko Dmitro <[email protected]> Authored: Wed Oct 9 21:10:30 2013 +0300 Committer: Lisnichenko Dmitro <[email protected]> Committed: Wed Oct 9 21:10:30 2013 +0300 ---------------------------------------------------------------------- ambari-agent/pom.xml | 4 - .../ambari_agent/ActionDependencyManager.py | 163 ------------ .../src/main/python/ambari_agent/ActionQueue.py | 79 ++---- .../src/main/python/ambari_agent/Heartbeat.py | 2 +- .../main/python/ambari_agent/UpgradeExecutor.py | 207 --------------- .../test/python/TestActionDependencyManager.py | 180 ------------- ambari-agent/src/test/python/TestActionQueue.py | 132 ++++------ ambari-agent/src/test/python/TestController.py | 5 +- ambari-agent/src/test/python/TestHeartbeat.py | 30 +-- .../src/test/python/TestUpgradeExecutor.py | 264 ------------------- 10 files changed, 86 insertions(+), 980 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/489a193d/ambari-agent/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-agent/pom.xml b/ambari-agent/pom.xml index 0341f0d..8681420 100644 --- a/ambari-agent/pom.xml +++ b/ambari-agent/pom.xml @@ -328,10 +328,6 @@ <location>../version</location> <filter>true</filter> </source> - <!--<source>--> - <!--<!– This file is also included into server rpm–>--> - <!--<location>../ambari-common/src/main/resources/role_command_order.json</location>--> - <!--</source>--> </sources> </mapping> <!-- --> http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/489a193d/ambari-agent/src/main/python/ambari_agent/ActionDependencyManager.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionDependencyManager.py b/ambari-agent/src/main/python/ambari_agent/ActionDependencyManager.py deleted file mode 100644 index 0addc9a..0000000 --- a/ambari-agent/src/main/python/ambari_agent/ActionDependencyManager.py +++ /dev/null @@ -1,163 +0,0 @@ -#!/usr/bin/env python2.6 - -''' -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -''' - -import logging -import Queue -import threading -import pprint -import os -import json - -logger = logging.getLogger() - -class ActionDependencyManager(): - """ - Implemments a scheduler of role commands (like DATANODE-START) based on - dependencies between them. Class does not execute actions, it only - breaks them on groups that may be executed in parallel. - """ - - DEPS_FILE_NAME="role_command_order.json" - COMMENT_STR="_comment" - - # Dictionary of dependencies. Format: - # BlockedRole-Command : [BlockerRole1-Command1, BlockerRole2-Command2, ...] - - - def __init__(self, config): - self.deps = {} - self.last_scheduled_group = [] - self.scheduled_action_groups = Queue.Queue() - self.lock = threading.RLock() - self.config = config - #self.read_dependencies() - - - def read_dependencies(self): - """ - Load dependencies from file - """ - prefix_dir = self.config.get('agent', 'prefix') - action_order_file = os.path.join(prefix_dir, self.DEPS_FILE_NAME) - with open(action_order_file) as f: - action_order_data = json.load(f) - for deps_group in action_order_data.keys(): - if deps_group != self.COMMENT_STR: # if entry is not a comment - deps_group_list = action_order_data[deps_group] - for blocked_str in deps_group_list: - if blocked_str != self.COMMENT_STR: # if entry is not a comment - blocker_list = deps_group_list[blocked_str] - if blocked_str not in self.deps: - self.deps[blocked_str]=[] - for blocker_str in blocker_list: - self.deps[blocked_str].append(blocker_str) - pass - - - def is_action_group_available(self): - return not self.scheduled_action_groups.empty() - - - def get_next_action_group(self): - """ - Returns next group of scheduled actions that may be - executed in parallel. If queue is empty, blocks until - an item is available (until next put_action() call) - """ - next_group = self.scheduled_action_groups.get(block=True) - with self.lock: # Synchronized - if next_group is self.last_scheduled_group: - # Group is not eligible for appending, creating new one - self.last_scheduled_group = [] - - dump_str = pprint.pformat(next_group) - logger.debug("Next action group: {0}".format(dump_str)) - return next_group - - - def put_actions(self, actions): - """ - Schedules actions to be executed in some time at future. - Here we rely on serial command execution sequence received from server. - Some of these commands may be executed in parallel with others, so we - unite them into a group. - """ - with self.lock: # Synchronized - for action in actions: - self.dump_info(action) - was_empty = len(self.last_scheduled_group) == 0 - if self.can_be_executed_in_parallel(action, self.last_scheduled_group): - self.last_scheduled_group.append(action) - else: # create a new group - self.last_scheduled_group = [action] - was_empty = True - if was_empty: - # last_scheduled_group is not empty now, so we add it to the queue - self.scheduled_action_groups.put(self.last_scheduled_group) - - - def dump_info(self, action): - """ - Prints info about command to log - """ - logger.info("Adding " + action['commandType'] + " for service " + \ - action['serviceName'] + " of cluster " + \ - action['clusterName'] + " to the queue.") - logger.debug(pprint.pformat(action)) - - - def can_be_executed_in_parallel(self, action, group): - """ - Checks whether action may be executed in parallel with a given group - """ - # Hack: parallel execution disabled - return False - - # from ActionQueue import ActionQueue - # # Empty group is compatible with any action - # if not group: - # return True - # # Status commands are placed into a separate group to avoid race conditions - # if action['commandType'] == ActionQueue.STATUS_COMMAND: - # for scheduled_action in group: - # if scheduled_action['commandType'] != ActionQueue.STATUS_COMMAND: - # return False - # return True - # # We avoid executing install/upgrade threads in parallel with anything - # standalone_commands = ["INSTALL", ActionQueue.ROLE_COMMAND_UPGRADE] - # if action['roleCommand'] in standalone_commands: - # return False - # # We can not perform few actions (like STOP and START) for a component - # # at the same time - # for scheduled_action in group: - # if scheduled_action['role'] == action['role']: - # return False - # # In other cases, check dependencies - # pattern = "{0}-{1}" - # new_action_str = pattern.format(action['role'], action['roleCommand']) - # for scheduled_action in group: - # if new_action_str in self.deps: - # blockers = self.deps[new_action_str] - # scheduled_action_str = pattern.format( - # scheduled_action['role'], scheduled_action['roleCommand']) - # if scheduled_action_str in blockers: - # return False - # # Everything seems to be ok - # return True http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/489a193d/ambari-agent/src/main/python/ambari_agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index c39cf72..a2ad9c5 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -17,21 +17,19 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ''' +import Queue import logging import traceback import threading -from threading import Thread import pprint import os from LiveStatus import LiveStatus from shell import shellRunner import PuppetExecutor -import UpgradeExecutor import PythonExecutor from ActualConfigHandler import ActualConfigHandler -from ActionDependencyManager import ActionDependencyManager from CommandStatusDict import CommandStatusDict @@ -49,13 +47,12 @@ class ActionQueue(threading.Thread): STATUS_COMMAND = 'STATUS_COMMAND' EXECUTION_COMMAND = 'EXECUTION_COMMAND' - ROLE_COMMAND_UPGRADE = 'UPGRADE' IN_PROGRESS_STATUS = 'IN_PROGRESS' def __init__(self, config, controller): super(ActionQueue, self).__init__() - self.commandQueue = ActionDependencyManager(config) + self.commandQueue = Queue.Queue() self.commandStatuses = CommandStatusDict(callback_action = self.status_update_callback) self.config = config @@ -71,51 +68,30 @@ class ActionQueue(threading.Thread): return self._stop.isSet() def put(self, commands): - self.commandQueue.put_actions(commands) + for command in commands: + logger.info("Adding " + command['commandType'] + " for service " + \ + command['serviceName'] + " of cluster " + \ + command['clusterName'] + " to the queue.") + logger.debug(pprint.pformat(command)) + self.commandQueue.put(command) def run(self): while not self.stopped(): - # Taking a new portion of tasks - portion = self.commandQueue.get_next_action_group() # Will block if queue is empty - portion = portion[::-1] # Reverse list order - self.process_portion_of_actions(portion) - - - def process_portion_of_actions(self, portion): - # starting execution of a group of commands - running_list = [] - finished_list = [] - while portion or running_list: # While not finished actions in portion - # poll threads under execution - for thread in running_list: - if not thread.is_alive(): - finished_list.append(thread) - # Remove finished from the running list - running_list[:] = [b for b in running_list if not b in finished_list] - # Start next actions - free_slots = self.MAX_CONCURRENT_ACTIONS - len(running_list) - while free_slots > 0 and portion: # Start new threads if available - command = portion.pop() - logger.debug("Took an element of Queue: " + pprint.pformat(command)) - if command['commandType'] == self.EXECUTION_COMMAND: - # Start separate threads for commands of this type - action_thread = Thread(target = self.execute_command_safely, args = (command, )) - running_list.append(action_thread) - free_slots -= 1 - action_thread.start() - elif command['commandType'] == self.STATUS_COMMAND: - # Execute status commands immediately, in current thread - self.execute_status_command(command) - else: - logger.error("Unrecognized command " + pprint.pformat(command)) - pass + command = self.commandQueue.get() # Will block if queue is empty + self.process_command(command) - def execute_command_safely(self, command): + def process_command(self, command): + logger.debug("Took an element of Queue: " + pprint.pformat(command)) # make sure we log failures try: - self.execute_command(command) + if command['commandType'] == self.EXECUTION_COMMAND: + self.execute_command(command) + elif command['commandType'] == self.STATUS_COMMAND: + self.execute_status_command(command) + else: + logger.error("Unrecognized command " + pprint.pformat(command)) except Exception, err: # Should not happen traceback.print_exc() @@ -123,6 +99,9 @@ class ActionQueue(threading.Thread): def execute_command(self, command): + ''' + Executes commands of type EXECUTION_COMMAND + ''' clusterName = command['clusterName'] commandId = command['commandId'] @@ -147,17 +126,8 @@ class ActionQueue(threading.Thread): self.config.get('puppet', 'puppet_home'), self.config.get('puppet', 'facter_home'), self.config.get('agent', 'prefix'), self.config) - if command['roleCommand'] == ActionQueue.ROLE_COMMAND_UPGRADE: - # Create new instances for the current thread - pythonExecutor = PythonExecutor.PythonExecutor( - self.config.get('agent', 'prefix'), self.config) - upgradeExecutor = UpgradeExecutor.UpgradeExecutor(pythonExecutor, - puppetExecutor, self.config) - commandresult = upgradeExecutor.perform_stack_upgrade(command, in_progress_status['tmpout'], - in_progress_status['tmperr']) - else: - commandresult = puppetExecutor.runCommand(command, in_progress_status['tmpout'], - in_progress_status['tmperr']) + commandresult = puppetExecutor.runCommand(command, in_progress_status['tmpout'], + in_progress_status['tmperr']) # dumping results status = "COMPLETED" if commandresult['exitcode'] != 0: @@ -189,6 +159,9 @@ class ActionQueue(threading.Thread): def execute_status_command(self, command): + ''' + Executes commands of type STATUS_COMMAND + ''' try: cluster = command['clusterName'] service = command['serviceName'] http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/489a193d/ambari-agent/src/main/python/ambari_agent/Heartbeat.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Heartbeat.py b/ambari-agent/src/main/python/ambari_agent/Heartbeat.py index c369677..f0e20c4 100644 --- a/ambari-agent/src/main/python/ambari_agent/Heartbeat.py +++ b/ambari-agent/src/main/python/ambari_agent/Heartbeat.py @@ -55,7 +55,7 @@ class Heartbeat: } commandsInProgress = False - if self.actionQueue.commandQueue.is_action_group_available(): + if not self.actionQueue.commandQueue.empty(): commandsInProgress = True if len(queueResult) != 0: heartbeat['reports'] = queueResult['reports'] http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/489a193d/ambari-agent/src/main/python/ambari_agent/UpgradeExecutor.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/UpgradeExecutor.py b/ambari-agent/src/main/python/ambari_agent/UpgradeExecutor.py deleted file mode 100644 index b189921..0000000 --- a/ambari-agent/src/main/python/ambari_agent/UpgradeExecutor.py +++ /dev/null @@ -1,207 +0,0 @@ -#!/usr/bin/env python2.6 - -''' -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -''' -import json -import os.path -import logging -import pprint -import re -from Grep import Grep -from StackVersionsFileHandler import StackVersionsFileHandler - - -logger = logging.getLogger() -grep = Grep() - -class UpgradeExecutor: - - """ Class that performs the StackVersion stack upgrade""" - - SCRIPT_DIRS = [ - 'pre-upgrade.d', - 'upgrade.d', - 'post-upgrade.d' - ] - - NAME_PARSING_FAILED_CODE = 999 - - def __init__(self, pythonExecutor, puppetExecutor, config): - self.pythonExecutor = pythonExecutor - self.puppetExecutor = puppetExecutor - self.stacksDir = config.get('stack', 'upgradeScriptsDir') - self.config = config - versionsFileDir = config.get('agent', 'prefix') - self.versionsHandler = StackVersionsFileHandler(versionsFileDir) - - - def perform_stack_upgrade(self, command, tmpout, tmperr): - logger.info("Performing stack upgrade") - params = command['commandParams'] - srcStack = params['source_stack_version'] - tgtStack = params['target_stack_version'] - component = command['role'] - - srcStackTuple = self.split_stack_version(srcStack) - tgtStackTuple = self.split_stack_version(tgtStack) - - if srcStackTuple is None or tgtStackTuple is None: - errorstr = "Source (%s) or target (%s) version does not match pattern \ - <Name>-<Version>" % (srcStack, tgtStack) - logger.info(errorstr) - result = { - 'exitcode' : 1, - 'stdout' : 'None', - 'stderr' : errorstr - } - elif srcStack != tgtStack: - paramTuple = sum((srcStackTuple, tgtStackTuple), ()) - upgradeId = "%s-%s.%s_%s-%s.%s" % paramTuple - # Check stack version (do we need upgrade?) - basedir = os.path.join(self.stacksDir, upgradeId, component) - if not os.path.isdir(basedir): - errorstr = "Upgrade %s is not supported (dir %s does not exist)" \ - % (upgradeId, basedir) - logger.error(errorstr) - result = { - 'exitcode' : 1, - 'stdout' : errorstr, - 'stderr' : errorstr - } - else: - result = { - 'exitcode' : 0, - 'stdout' : '', - 'stderr' : '' - } - # Request repos update (will be executed once before running any pp file) - self.puppetExecutor.discardInstalledRepos() - for dir in self.SCRIPT_DIRS: - if result['exitcode'] != 0: - break - tmpRes = self.execute_dir(command, basedir, dir, tmpout, tmperr) - - result = { - 'exitcode' : result['exitcode'] or tmpRes['exitcode'], - 'stdout' : "%s\n%s" % (result['stdout'], tmpRes['stdout']), - 'stderr' : "%s\n%s" % (result['stderr'], tmpRes['stderr']), - } - - if result['exitcode'] == 0: - logger.info("Upgrade %s successfully finished" % upgradeId) - self.versionsHandler.write_stack_version(component, tgtStack) - else: - infostr = "target_stack_version (%s) matches current stack version" \ - " for component %s, nothing to do" % (tgtStack, component) - logger.info(infostr) - result = { - 'exitcode' : 0, - 'stdout' : infostr, - 'stderr' : 'None' - } - result = { - 'exitcode' : result['exitcode'], - 'stdout' : grep.tail(result['stdout'], grep.OUTPUT_LAST_LINES), - 'stderr' : grep.tail(result['stderr'], grep.OUTPUT_LAST_LINES) - } - return result - - - def get_key_func(self, name): - """ - Returns a number from filenames like 70-foobar.* or 999 for not matching - filenames - """ - parts = name.split('-', 1) - if not parts or not parts[0].isdigit(): - logger.warn("Can't parse script filename number %s" % name) - return self.NAME_PARSING_FAILED_CODE # unknown element will be placed to the end of list - return int(parts[0]) - - - def split_stack_version(self, verstr): - verdict = json.loads(verstr) - stack_name = verdict["stackName"].strip() - - matchObj = re.match( r'(\d+).(\d+)', verdict["stackVersion"].strip(), re.M|re.I) - if matchObj: - stack_major_ver = matchObj.group(1) - stack_minor_ver = matchObj.group(2) - return stack_name, stack_major_ver, stack_minor_ver - else: - return None - - - def execute_dir(self, command, basedir, dir, tmpout, tmperr): - """ - Executes *.py and *.pp files located in a given directory. - Files a executed in a numeric sorting order. - """ - dirpath = os.path.join(basedir, dir) - logger.info("Executing %s" % dirpath) - if not os.path.isdir(dirpath): - warnstr = "Script directory %s does not exist, skipping" % dirpath - logger.warn(warnstr) - result = { - 'exitcode' : 0, - 'stdout' : warnstr, - 'stderr' : 'None' - } - return result - fileList=os.listdir(dirpath) - fileList.sort(key = self.get_key_func) - formattedResult = { - 'exitcode' : 0, - 'stdout' : '', - 'stderr' : '' - } - for filename in fileList: - prevcode = formattedResult['exitcode'] - if prevcode != 0 or self.get_key_func(filename) == self.NAME_PARSING_FAILED_CODE: - break - filepath = os.path.join(dirpath, filename) - if filename.endswith(".pp"): - logger.info("Running puppet file %s" % filepath) - result = self.puppetExecutor.run_manifest(command, filepath, - tmpout, tmperr) - elif filename.endswith(".py"): - logger.info("Running python file %s" % filepath) - result = self.pythonExecutor.run_file(command, filepath, tmpout, tmperr) - elif filename.endswith(".pyc"): - pass # skipping compiled files - else: - warnstr = "Unrecognized file type, skipping: %s" % filepath - logger.warn(warnstr) - result = { - 'exitcode' : 0, - 'stdout' : warnstr, - 'stderr' : 'None' - } - formattedResult = { - 'exitcode' : prevcode or result['exitcode'], - 'stdout' : "%s\n%s" % (formattedResult['stdout'], result['stdout']), - 'stderr' : "%s\n%s" % (formattedResult['stderr'], result['stderr']), - } - logger.debug("Result of %s: \n %s" % (dirpath, pprint.pformat(formattedResult))) - return formattedResult - - - - - - http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/489a193d/ambari-agent/src/test/python/TestActionDependencyManager.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/TestActionDependencyManager.py b/ambari-agent/src/test/python/TestActionDependencyManager.py deleted file mode 100644 index a718779..0000000 --- a/ambari-agent/src/test/python/TestActionDependencyManager.py +++ /dev/null @@ -1,180 +0,0 @@ -#!/usr/bin/env python2.6 - -''' -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -''' - -from unittest import TestCase -from ambari_agent.AmbariConfig import AmbariConfig -from ambari_agent.ActionQueue import ActionQueue -from ambari_agent.ActionDependencyManager import ActionDependencyManager -import os, errno, time, pprint, tempfile, threading, sys -from mock.mock import patch, MagicMock, call - -class TestActionDependencyManager(TestCase): - - dummy_RCO_file = os.path.join('dummy_files', 'test_rco_data.json') - - def setUp(self): - self.config = AmbariConfig().getConfig() - self.config.set('agent', 'prefix', os.getcwd()) - ActionDependencyManager.DEPS_FILE_NAME = self.dummy_RCO_file - - # TODO: disabled for now - def disabled_test_init(self): - """ - Tests config load - """ - adm = ActionDependencyManager(self.config) - deps_dump = pprint.pformat(adm.deps) - expected = "{u'DATANODE-STOP': [u'JOBTRACKER-STOP',\n " \ - "u'TASKTRACKER-STOP',\n " \ - "u'RESOURCEMANAGER-STOP',\n " \ - "u'NODEMANAGER-STOP',\n " \ - "u'HISTORYSERVER-STOP',\n " \ - "u'HBASE_MASTER-STOP'],\n u'HBASE_MASTER-START': " \ - "[u'PEERSTATUS-START'],\n u'JOBTRACKER-START': " \ - "[u'PEERSTATUS-START'],\n u'RESOURCEMANAGER-START': " \ - "[u'NAMENODE-START', u'DATANODE-START'],\n " \ - "u'SECONDARY_NAMENODE-START': [u'DATANODE-START', " \ - "u'NAMENODE-START'],\n u'SECONDARY_NAMENODE-UPGRADE': " \ - "[u'NAMENODE-UPGRADE']}" - self.assertEqual(deps_dump, expected) - - - def test_is_action_group_available(self): - adm = ActionDependencyManager(self.config) - self.assertFalse(adm.is_action_group_available()) - adm.scheduled_action_groups.put(["test"]) - self.assertTrue(adm.is_action_group_available()) - - - def test_get_next_action_group(self): - adm = ActionDependencyManager(self.config) - test1 = ["test1"] - test2 = ["test2"] - adm.scheduled_action_groups.put(test1) - adm.scheduled_action_groups.put(test2) - adm.last_scheduled_group = test2 - self.assertTrue(adm.is_action_group_available()) - # Taking 1st - self.assertEqual(test1, adm.get_next_action_group()) - self.assertTrue(len(adm.last_scheduled_group) == 1) - self.assertTrue(adm.is_action_group_available()) - # Taking 2nd - self.assertEqual(test2, adm.get_next_action_group()) - self.assertTrue(len(adm.last_scheduled_group) == 0) - self.assertTrue(adm.last_scheduled_group is not test2) - self.assertFalse(adm.is_action_group_available()) - - - @patch.object(ActionDependencyManager, "dump_info") - @patch.object(ActionDependencyManager, "can_be_executed_in_parallel") - def test_put_action(self, can_be_executed_in_parallel_mock, dump_info_mock): - can_be_executed_in_parallel_mock.side_effect = [True, False, True, False, - True, True, True, False] - adm = ActionDependencyManager(self.config) - - adm.put_actions(list(range(0, 8))) - - queue = [] - while adm.is_action_group_available(): - next = adm.get_next_action_group() - queue.append(next) - - str = pprint.pformat(queue) - expected = "[[0], [1, 2], [3, 4, 5, 6], [7]]" - self.assertEqual(str, expected) - - - # TODO: disabled for now - def disabled_test_can_be_executed_in_parallel(self): - adm = ActionDependencyManager(self.config) - # empty group - group = [] - install_command = { - 'role': 'DATANODE', - 'roleCommand': 'INSTALL', - 'commandType': ActionQueue.EXECUTION_COMMAND - } - upgrade_command = { - 'role': 'DATANODE', - 'roleCommand': 'UPGRADE', - 'commandType': ActionQueue.EXECUTION_COMMAND - } - start_command = { - 'role': 'DATANODE', - 'roleCommand': 'START', - 'commandType': ActionQueue.EXECUTION_COMMAND - } - stop_command = { - 'role': 'DATANODE', - 'roleCommand': 'STOP', - 'commandType': ActionQueue.EXECUTION_COMMAND - } - status_command = { - 'commandType': ActionQueue.STATUS_COMMAND - } - rm_start_command = { - 'role': 'RESOURCEMANAGER', - 'roleCommand': 'START', - 'commandType': ActionQueue.EXECUTION_COMMAND - } - hm_start_command = { - 'role': 'HBASE_MASTER', - 'roleCommand': 'START', - 'commandType': ActionQueue.EXECUTION_COMMAND - } - self.assertTrue(adm.can_be_executed_in_parallel(install_command, group)) - self.assertTrue(adm.can_be_executed_in_parallel(status_command, group)) - # multiple status commands - group = [] - for i in range(0, 3): - group.append(status_command) - self.assertTrue(adm.can_be_executed_in_parallel(status_command, group)) - self.assertFalse(adm.can_be_executed_in_parallel(install_command, group)) - # new status command - group = [install_command] - self.assertFalse(adm.can_be_executed_in_parallel(status_command, group)) - # install/upgrade commands - group = [install_command] - self.assertFalse(adm.can_be_executed_in_parallel(install_command, group)) - self.assertFalse(adm.can_be_executed_in_parallel(upgrade_command, group)) - self.assertFalse(adm.can_be_executed_in_parallel(status_command, group)) - self.assertFalse(adm.can_be_executed_in_parallel(start_command, group)) - group = [upgrade_command] - self.assertFalse(adm.can_be_executed_in_parallel(install_command, group)) - self.assertFalse(adm.can_be_executed_in_parallel(upgrade_command, group)) - self.assertFalse(adm.can_be_executed_in_parallel(status_command, group)) - self.assertFalse(adm.can_be_executed_in_parallel(start_command, group)) - # Other commands - group = [start_command] - self.assertFalse(adm.can_be_executed_in_parallel(install_command, group)) - self.assertFalse(adm.can_be_executed_in_parallel(upgrade_command, group)) - self.assertFalse(adm.can_be_executed_in_parallel(status_command, group)) - self.assertTrue(adm.can_be_executed_in_parallel(hm_start_command, group)) - # Check dependency processing - group = [start_command] - self.assertFalse(adm.can_be_executed_in_parallel(rm_start_command, group)) - group = [start_command] - self.assertTrue(adm.can_be_executed_in_parallel(hm_start_command, group)) - # actions for the same component - group = [start_command] - self.assertFalse(adm.can_be_executed_in_parallel(stop_command, group)) - group = [stop_command] - self.assertFalse(adm.can_be_executed_in_parallel(start_command, group)) - http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/489a193d/ambari-agent/src/test/python/TestActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/TestActionQueue.py b/ambari-agent/src/test/python/TestActionQueue.py index 6fa1407..31643b2 100644 --- a/ambari-agent/src/test/python/TestActionQueue.py +++ b/ambari-agent/src/test/python/TestActionQueue.py @@ -17,13 +17,13 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ''' +from Queue import Queue from unittest import TestCase from ambari_agent.LiveStatus import LiveStatus from ambari_agent.PuppetExecutor import PuppetExecutor from ambari_agent.ActionQueue import ActionQueue from ambari_agent.AmbariConfig import AmbariConfig -from ambari_agent.ActionDependencyManager import ActionDependencyManager import os, errno, time, pprint, tempfile, threading import StringIO import sys @@ -31,7 +31,6 @@ from threading import Thread from mock.mock import patch, MagicMock, call from ambari_agent.StackVersionsFileHandler import StackVersionsFileHandler -from ambari_agent.UpgradeExecutor import UpgradeExecutor class TestActionQueue(TestCase): @@ -126,106 +125,79 @@ class TestActionQueue(TestCase): } - @patch.object(ActionDependencyManager, "read_dependencies") - @patch.object(ActionDependencyManager, "get_next_action_group") - @patch.object(ActionQueue, "process_portion_of_actions") - def test_ActionQueueStartStop(self, process_portion_of_actions_mock, - get_next_action_group_mock, read_dependencies_mock): + @patch.object(ActionQueue, "process_command") + @patch.object(Queue, "get") + def test_ActionQueueStartStop(self, get_mock, process_command_mock): actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller') actionQueue.start() time.sleep(0.1) actionQueue.stop() actionQueue.join() self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.') - self.assertTrue(get_next_action_group_mock.call_count > 1) - self.assertTrue(process_portion_of_actions_mock.call_count > 1) + self.assertTrue(process_command_mock.call_count > 1) - @patch.object(ActionDependencyManager, "read_dependencies") + @patch("traceback.print_exc") @patch.object(ActionQueue, "execute_command") @patch.object(ActionQueue, "execute_status_command") - def test_process_portion_of_actions(self, execute_status_command_mock, - executeCommand_mock, read_dependencies_mock): + def test_process_command(self, execute_status_command_mock, + execute_command_mock, print_exc_mock): actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller') - # Test execution of EXECUTION_COMMANDs - max = 3 - actionQueue.MAX_CONCURRENT_ACTIONS = max - unfreeze_flag = threading.Event() - sync_lock = threading.RLock() - stats = { - 'waiting_threads' : 0 + execution_command = { + 'commandType' : ActionQueue.EXECUTION_COMMAND, } - def side_effect(self): - with sync_lock: # Synchtonized to avoid race effects during test execution - stats['waiting_threads'] += 1 - unfreeze_flag.wait() - executeCommand_mock.side_effect = side_effect - portion = [self.datanode_install_command, - self.namenode_install_command, - self.snamenode_install_command, - self.nagios_install_command, - self.hbase_install_command] + status_command = { + 'commandType' : ActionQueue.STATUS_COMMAND, + } + wrong_command = { + 'commandType' : "SOME_WRONG_COMMAND", + } + # Try wrong command + actionQueue.process_command(wrong_command) + self.assertFalse(execute_command_mock.called) + self.assertFalse(execute_status_command_mock.called) + self.assertFalse(print_exc_mock.called) - # We call method in a separate thread - action_thread = Thread(target = actionQueue.process_portion_of_actions, args = (portion, )) - action_thread.start() - # Now we wait to check that only MAX_CONCURRENT_ACTIONS threads are running - while stats['waiting_threads'] != max: - time.sleep(0.1) - self.assertEqual(stats['waiting_threads'], max) - # unfreezing waiting threads - unfreeze_flag.set() - # wait until all threads are finished - action_thread.join() - self.assertTrue(executeCommand_mock.call_count == 5) + execute_command_mock.reset_mock() + execute_status_command_mock.reset_mock() + print_exc_mock.reset_mock() + # Try normal execution + actionQueue.process_command(execution_command) + self.assertTrue(execute_command_mock.called) self.assertFalse(execute_status_command_mock.called) - executeCommand_mock.reset_mock() + self.assertFalse(print_exc_mock.called) + + execute_command_mock.reset_mock() execute_status_command_mock.reset_mock() + print_exc_mock.reset_mock() - # Test execution of STATUS_COMMANDs - n = 5 - portion = [] - for i in range(0, n): - status_command = { - 'componentName': 'DATANODE', - 'commandType': 'STATUS_COMMAND', - } - portion.append(status_command) - actionQueue.process_portion_of_actions(portion) - self.assertTrue(execute_status_command_mock.call_count == n) - self.assertFalse(executeCommand_mock.called) - - # Test execution of unknown command - unknown_command = { - 'commandType': 'WRONG_COMMAND', - } - portion = [unknown_command] - actionQueue.process_portion_of_actions(portion) - # no exception expected - pass + actionQueue.process_command(status_command) + self.assertFalse(execute_command_mock.called) + self.assertTrue(execute_status_command_mock.called) + self.assertFalse(print_exc_mock.called) + execute_command_mock.reset_mock() + execute_status_command_mock.reset_mock() + print_exc_mock.reset_mock() - @patch("traceback.print_exc") - @patch.object(ActionDependencyManager, "read_dependencies") - @patch.object(ActionQueue, "execute_command") - def test_execute_command_safely(self, execute_command_mock, - read_dependencies_mock, print_exc_mock): - actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller') - # Try normal execution - actionQueue.execute_command_safely('command') - # Try exception ro check proper logging + # Try exception to check proper logging def side_effect(self): raise Exception("TerribleException") execute_command_mock.side_effect = side_effect - actionQueue.execute_command_safely('command') + actionQueue.process_command(execution_command) + self.assertTrue(print_exc_mock.called) + + print_exc_mock.reset_mock() + + execute_status_command_mock.side_effect = side_effect + actionQueue.process_command(execution_command) self.assertTrue(print_exc_mock.called) + @patch("__builtin__.open") @patch.object(ActionQueue, "status_update_callback") - @patch.object(ActionDependencyManager, "read_dependencies") - def test_execute_command(self, read_dependencies_mock, - status_update_callback_mock, open_mock): + def test_execute_command(self, status_update_callback_mock, open_mock): # Make file read calls visible def open_side_effect(file, mode): if mode == 'r': @@ -251,10 +223,7 @@ class TestActionQueue(TestCase): def patched_aq_execute_command(command): # We have to perform patching for separate thread in the same thread with patch.object(PuppetExecutor, "runCommand") as runCommand_mock: - with patch.object(UpgradeExecutor, "perform_stack_upgrade") \ - as perform_stack_upgrade_mock: runCommand_mock.side_effect = side_effect - perform_stack_upgrade_mock.side_effect = side_effect actionQueue.execute_command(command) ### Test install/start/stop command ### ## Test successful execution with configuration tags @@ -379,11 +348,10 @@ class TestActionQueue(TestCase): @patch.object(ActionQueue, "status_update_callback") @patch.object(StackVersionsFileHandler, "read_stack_version") - @patch.object(ActionDependencyManager, "read_dependencies") @patch.object(ActionQueue, "execute_command") @patch.object(LiveStatus, "build") def test_execute_status_command(self, build_mock, execute_command_mock, - read_dependencies_mock, read_stack_version_mock, + read_stack_version_mock, status_update_callback): actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller') build_mock.return_value = "dummy report" @@ -392,4 +360,4 @@ class TestActionQueue(TestCase): report = actionQueue.result() expected = 'dummy report' self.assertEqual(len(report['componentStatus']), 1) - self.assertEqual(report['componentStatus'][0], expected) \ No newline at end of file + self.assertEqual(report['componentStatus'][0], expected) http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/489a193d/ambari-agent/src/test/python/TestController.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/TestController.py b/ambari-agent/src/test/python/TestController.py index 79340ed..2b0e614 100644 --- a/ambari-agent/src/test/python/TestController.py +++ b/ambari-agent/src/test/python/TestController.py @@ -23,7 +23,6 @@ import StringIO import ssl import unittest, threading from ambari_agent import Controller, ActionQueue -from ambari_agent.ActionDependencyManager import ActionDependencyManager from ambari_agent import hostname import sys from ambari_agent.Controller import AGENT_AUTO_RESTART_EXIT_CODE @@ -153,9 +152,7 @@ class TestController(unittest.TestCase): @patch("urllib2.build_opener") @patch("urllib2.install_opener") @patch.object(ActionQueue.ActionQueue, "run") - @patch.object(ActionDependencyManager, "read_dependencies") - @patch.object(ActionDependencyManager, "dump_info") - def test_repeatRegistration(self, dump_info_mock, read_dependencies_mock, + def test_repeatRegistration(self, run_mock, installMock, buildMock): registerAndHeartbeat = MagicMock(name="registerAndHeartbeat") http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/489a193d/ambari-agent/src/test/python/TestHeartbeat.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/TestHeartbeat.py b/ambari-agent/src/test/python/TestHeartbeat.py index cceb764..5ef5d72 100644 --- a/ambari-agent/src/test/python/TestHeartbeat.py +++ b/ambari-agent/src/test/python/TestHeartbeat.py @@ -21,7 +21,6 @@ limitations under the License. from unittest import TestCase import unittest from ambari_agent.Heartbeat import Heartbeat -from ambari_agent.ActionDependencyManager import ActionDependencyManager from ambari_agent.ActionQueue import ActionQueue from ambari_agent.LiveStatus import LiveStatus from ambari_agent import AmbariConfig @@ -47,8 +46,7 @@ class TestHeartbeat(TestCase): sys.stdout = sys.__stdout__ - @patch.object(ActionDependencyManager, "read_dependencies") - def test_build(self, read_dependencies_mock): + def test_build(self): actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller') heartbeat = Heartbeat(actionQueue) result = heartbeat.build(100) @@ -66,12 +64,9 @@ class TestHeartbeat(TestCase): self.assertEquals(not heartbeat.reports, True, "Heartbeat should not contain task in progress") - @patch.object(ActionDependencyManager, "read_dependencies") @patch.object(ActionQueue, "result") - @patch.object(ActionDependencyManager, "is_action_group_available") @patch.object(HostInfo, "register") - def test_no_mapping(self, register_mock, is_action_group_available_mock, result_mock, - read_dependencies_mock): + def test_no_mapping(self, register_mock, result_mock): result_mock.return_value = { 'reports': [{'status': 'IN_PROGRESS', 'stderr': 'Read from /tmp/errors-3.txt', @@ -95,11 +90,8 @@ class TestHeartbeat(TestCase): self.assertEqual(register_mock.call_args_list[0][0][1], False) - @patch.object(ActionDependencyManager, "read_dependencies") @patch.object(ActionQueue, "result") - @patch.object(ActionDependencyManager, "is_action_group_available") - def test_build_long_result(self, is_action_group_available_mock, result_mock, - read_dependencies_mock): + def test_build_long_result(self, result_mock): actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller') result_mock.return_value = { 'reports': [{'status': 'IN_PROGRESS', @@ -184,22 +176,17 @@ class TestHeartbeat(TestCase): self.assertEquals(hb, expected) - @patch.object(ActionDependencyManager, "read_dependencies") - @patch.object(ActionDependencyManager, "dump_info") - @patch.object(ActionDependencyManager, "can_be_executed_in_parallel") @patch.object(HostInfo, 'register') - def test_heartbeat_no_host_check_cmd_in_queue(self, register_mock, - can_be_executed_in_parallel_mock, dump_info_mock, read_dependencies_mock): - can_be_executed_in_parallel_mock.return_value = False + def test_heartbeat_no_host_check_cmd_in_queue(self, register_mock): actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller') statusCommand = { "serviceName" : 'HDFS', "commandType" : "STATUS_COMMAND", - "clusterName" : "", + "clusterName" : "c1", "componentName" : "DATANODE", 'configurations':{'global' : {}} } - actionQueue.put(list(statusCommand)) + actionQueue.put([statusCommand]) heartbeat = Heartbeat(actionQueue) heartbeat.build(12, 6) @@ -209,9 +196,8 @@ class TestHeartbeat(TestCase): self.assertFalse(args[1]) - @patch.object(ActionDependencyManager, "read_dependencies") @patch.object(HostInfo, 'register') - def test_heartbeat_host_check_no_cmd(self, register_mock, read_dependencies_mock): + def test_heartbeat_host_check_no_cmd(self, register_mock): actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig(),'dummy_controller') heartbeat = Heartbeat(actionQueue) heartbeat.build(12, 6) @@ -222,4 +208,4 @@ class TestHeartbeat(TestCase): if __name__ == "__main__": - unittest.main(verbosity=2) \ No newline at end of file + unittest.main(verbosity=2) http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/489a193d/ambari-agent/src/test/python/TestUpgradeExecutor.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/TestUpgradeExecutor.py b/ambari-agent/src/test/python/TestUpgradeExecutor.py deleted file mode 100644 index 7abb959..0000000 --- a/ambari-agent/src/test/python/TestUpgradeExecutor.py +++ /dev/null @@ -1,264 +0,0 @@ -#!/usr/bin/env python2.6 - -''' -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -''' - -from unittest import TestCase -import unittest -import StringIO -import socket -import os, sys, pprint, json -from mock.mock import patch -from mock.mock import MagicMock -from mock.mock import create_autospec -import os, errno, tempfile -from ambari_agent import UpgradeExecutor -import logging -from ambari_agent import AmbariConfig -from ambari_agent.StackVersionsFileHandler import StackVersionsFileHandler - -class TestUpgradeExecutor(TestCase): - - logger = logging.getLogger() - - @patch.object(StackVersionsFileHandler, 'write_stack_version') - @patch('os.path.isdir') - def test_perform_stack_upgrade(self, isdir_method, write_stack_version_method): - puppetExecutor = MagicMock() - executor = UpgradeExecutor.UpgradeExecutor('pythonExecutor', - puppetExecutor, AmbariConfig.AmbariConfig().getConfig()) - - # Checking matching versions - command = { - 'commandParams' : { - 'source_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}', - 'target_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}', - }, - 'role' : 'HDFS' - } - result = executor.perform_stack_upgrade(command, 'tmpout', 'tmperr') - self.assertTrue('matches current stack version' in result['stdout']) - self.assertFalse(write_stack_version_method.called) - # Checking unsupported update - write_stack_version_method.reset() - command = { - 'commandParams' : { - 'source_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.0.1\"}', - 'target_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}', - }, - 'role' : 'HDFS' - } - isdir_method.return_value = False - result = executor.perform_stack_upgrade(command, 'tmpout', 'tmperr') - self.assertTrue('not supported' in result['stderr']) - self.assertFalse(write_stack_version_method.called) - # Checking wrong source version - write_stack_version_method.reset() - command = { - 'commandParams' : { - 'source_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"Wrong\"}', - 'target_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}', - }, - 'role' : 'HDFS' - } - result = executor.perform_stack_upgrade(command, 'tmpout', 'tmperr') - self.assertTrue('does not match pattern' in result['stderr']) - self.assertFalse(write_stack_version_method.called) - # Checking wrong target version - write_stack_version_method.reset() - command = { - 'commandParams' : { - 'source_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}', - 'target_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"Wrong\"}', - }, - 'role' : 'HDFS' - } - result = executor.perform_stack_upgrade(command, 'tmpout', 'tmperr') - self.assertTrue('does not match pattern' in result['stderr']) - self.assertFalse(write_stack_version_method.called) - # Checking successful result - write_stack_version_method.reset() - command = { - 'commandParams' : { - 'source_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.0.1\"}', - 'target_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}', - }, - 'role' : 'HDFS' - } - isdir_method.return_value = True - executor.execute_dir = lambda command, basedir, dir, tmpout, tmperr : \ - { - 'exitcode' : 0, - 'stdout' : "output - %s" % dir, - 'stderr' : "errors - %s" % dir, - } - result = executor.perform_stack_upgrade(command, 'tmpout', 'tmperr') - self.assertTrue(write_stack_version_method.called) - self.assertEquals(result['exitcode'],0) - self.assertEquals(result['stdout'],'output - pre-upgrade.d\noutput - upgrade.d\noutput - post-upgrade.d') - self.assertEquals(result['stderr'],'errors - pre-upgrade.d\nerrors - upgrade.d\nerrors - post-upgrade.d') - # Checking failed result - write_stack_version_method.reset() - command = { - 'commandParams' : { - 'source_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.0.1\"}', - 'target_stack_version' : '{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}', - }, - 'role' : 'HDFS' - } - isdir_method.return_value = True - executor.execute_dir = lambda command, basedir, dir, tmpout, tmperr :\ - { - 'exitcode' : 1, - 'stdout' : "output - %s" % dir, - 'stderr' : "errors - %s" % dir, - } - result = executor.perform_stack_upgrade(command, 'tmpout', 'tmperr') - self.assertTrue(write_stack_version_method.called) - self.assertEquals(result['exitcode'],1) - self.assertEquals(result['stdout'],'output - pre-upgrade.d') - self.assertEquals(result['stderr'],'errors - pre-upgrade.d') - - - def test_get_key_func(self): - executor = UpgradeExecutor.UpgradeExecutor('pythonExecutor', - 'puppetExecutor', AmbariConfig.AmbariConfig().getConfig()) - # Checking unparseable - self.assertEqual(executor.get_key_func('fdsfds'), 999) - self.assertEqual(executor.get_key_func('99dfsfd'), 999) - self.assertEqual(executor.get_key_func('-fdfds'), 999) - # checking parseable - self.assertEqual(executor.get_key_func('99'), 99) - self.assertEqual(executor.get_key_func('45-install'), 45) - self.assertEqual(executor.get_key_func('33-install-staff'), 33) - #checking sorting of full list - testlist1 = ['7-fdfd', '10-erewfds', '11-fdfdfd', '1-hh', '20-kk', '01-tt'] - testlist1.sort(key = executor.get_key_func) - self.assertEqual(testlist1, - ['1-hh', '01-tt', '7-fdfd', '10-erewfds', '11-fdfdfd', '20-kk']) - - - def test_split_stack_version(self): - executor = UpgradeExecutor.UpgradeExecutor('pythonExecutor', - 'puppetExecutor', AmbariConfig.AmbariConfig().getConfig()) - result = executor.split_stack_version('{\"stackName\":\"HDP\",\"stackVersion\":\"1.2.1\"}') - self.assertEquals(result, ('HDP', '1', '2')) - result = executor.split_stack_version('{\"stackName\":\"HDP\",\"stackVersion\":\"1.3\"}') - self.assertEquals(result, ('HDP', '1', '3')) - result = executor.split_stack_version('{\"stackName\":\"ComplexStackVersion\",\"stackVersion\":\"1.3.4.2.2\"}') - self.assertEquals(result, ('ComplexStackVersion', '1', '3')) - result = executor.split_stack_version('{\"stackName\":\"HDP\",\"stackVersion\":\"1\"}') - self.assertEquals(result, None) - pass - - - @patch('os.listdir') - @patch('os.path.isdir') - @patch.object(UpgradeExecutor.UpgradeExecutor, 'get_key_func') - def test_execute_dir(self, get_key_func_method, isdir_method, listdir_method): - pythonExecutor = MagicMock() - puppetExecutor = MagicMock() - - command = {'debug': 'command'} - isdir_method.return_value = True - # Mocking sort() method of list - class MyList(list): - pass - files = MyList(['first.py', 'second.pp', 'third.py', 'fourth.nm', - 'fifth-failing.py', 'six.py']) - files.sort = lambda key: None - listdir_method.return_value = files - # fifth-failing.py will fail - pythonExecutor.run_file.side_effect = [ - {'exitcode' : 0, - 'stdout' : "stdout - first.py", - 'stderr' : "stderr - first.py"}, - {'exitcode' : 0, - 'stdout' : "stdout - third.py", - 'stderr' : "stderr - third.py"}, - {'exitcode' : 1, - 'stdout' : "stdout - fifth-failing.py", - 'stderr' : "stderr - fifth-failing.py"}, - {'exitcode' : 0, - 'stdout' : "stdout - six.py", - 'stderr' : "stderr - six.py"}, - ] - puppetExecutor.run_manifest.side_effect = [ - {'exitcode' : 0, - 'stdout' : "stdout - second.pp", - 'stderr' : "stderr - second.pp"}, - ] - - executor = UpgradeExecutor.UpgradeExecutor(pythonExecutor, - puppetExecutor, AmbariConfig.AmbariConfig().getConfig()) - - result= executor.execute_dir(command, 'basedir', 'dir', 'tmpout', 'tmperr') - self.assertEquals(result['exitcode'],1) - self.assertEquals(result['stdout'],"\nstdout - first.py\nstdout - second.pp\nstdout - third.py\nUnrecognized file type, skipping: basedir/dir/fourth.nm\nstdout - fifth-failing.py") - self.assertEquals(result['stderr'],"\nstderr - first.py\nstderr - second.pp\nstderr - third.py\nNone\nstderr - fifth-failing.py") - - - @patch('os.path.isdir') - def test_execute_dir_not_existing(self, isdir_method): - pythonExecutor = MagicMock() - puppetExecutor = MagicMock() - - command = {'debug': 'command'} - isdir_method.return_value = False - - executor = UpgradeExecutor.UpgradeExecutor(pythonExecutor, - puppetExecutor, AmbariConfig.AmbariConfig().getConfig()) - - result= executor.execute_dir(command, 'basedir', 'not_existing_dir', 'tmpout', 'tmperr') - self.assertEquals(result['exitcode'],0) - self.assertEquals(result['stdout'],'Script directory basedir/not_existing_dir does not exist, skipping') - self.assertEquals(result['stderr'],'None') - - - @patch('os.listdir') - @patch('os.path.isdir') - def test_execute_dir_ignore_badly_named(self, isdir_method, listdir_method): - pythonExecutor = MagicMock() - puppetExecutor = MagicMock() - - command = {'debug': 'command'} - isdir_method.return_value = True - files = ['00-first.py', 'badly-named.pp', '10-second.pp', '20-wrong.cpp'] - listdir_method.return_value = files - # fifth-failing.py will fail - pythonExecutor.run_file.side_effect = [ - {'exitcode' : 0, - 'stdout' : "stdout - python.py", - 'stderr' : "stderr - python.py"}, - ] - puppetExecutor.run_manifest.side_effect = [ - {'exitcode' : 0, - 'stdout' : "stdout - puppet.pp", - 'stderr' : "stderr - puppet.pp"}, - ] - - executor = UpgradeExecutor.UpgradeExecutor(pythonExecutor, - puppetExecutor, AmbariConfig.AmbariConfig().getConfig()) - - result= executor.execute_dir(command, 'basedir', 'dir', 'tmpout', 'tmperr') - self.assertEquals(result['exitcode'],0) - self.assertEquals(result['stdout'],'\nstdout - python.py\nstdout - puppet.pp\nUnrecognized file type, skipping: basedir/dir/20-wrong.cpp') - self.assertEquals(result['stderr'],'\nstderr - python.py\nstderr - puppet.pp\nNone') - -if __name__ == "__main__": - unittest.main(verbosity=2) \ No newline at end of file
