Updated Branches: refs/heads/trunk a67fb7cb3 -> 98adf5947
AMBARI-3586. Introduce CustomServiceOrchestrator and basic script classes (dlysnichenko) Project: http://git-wip-us.apache.org/repos/asf/incubator-ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ambari/commit/98adf594 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ambari/tree/98adf594 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ambari/diff/98adf594 Branch: refs/heads/trunk Commit: 98adf59479cd1be9bb62ff09d29abc7696f252ef Parents: a67fb7c Author: Lisnichenko Dmitro <[email protected]> Authored: Thu Oct 24 22:37:34 2013 +0300 Committer: Lisnichenko Dmitro <[email protected]> Committed: Thu Oct 24 22:37:34 2013 +0300 ---------------------------------------------------------------------- ambari-agent/conf/unix/ambari-agent.ini | 1 + ambari-agent/pom.xml | 10 +- .../src/main/python/ambari_agent/ActionQueue.py | 15 ++- .../main/python/ambari_agent/AgentException.py | 24 ++++ .../main/python/ambari_agent/AmbariConfig.py | 1 + .../ambari_agent/CustomServiceOrchestrator.py | 132 +++++++++++++++++++ .../src/main/python/ambari_agent/FileCache.py | 66 ++++++++++ .../main/python/ambari_agent/PythonExecutor.py | 33 +++-- .../main/python/resource_management/script.py | 102 ++++++++++++++ .../python/TestCustomServiceOrchestrator.py | 54 ++++++++ .../src/test/python/TestPythonExecutor.py | 28 ++-- 11 files changed, 433 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/98adf594/ambari-agent/conf/unix/ambari-agent.ini ---------------------------------------------------------------------- diff --git a/ambari-agent/conf/unix/ambari-agent.ini b/ambari-agent/conf/unix/ambari-agent.ini index 0616a0e..ca94ba1 100644 --- a/ambari-agent/conf/unix/ambari-agent.ini +++ b/ambari-agent/conf/unix/ambari-agent.ini @@ -24,6 +24,7 @@ loglevel=INFO data_cleanup_interval=86400 data_cleanup_max_age=2592000 ping_port=8670 +cache_dir=/var/lib/ambari-agent/cache [stack] installprefix=/var/ambari-agent/ http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/98adf594/ambari-agent/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-agent/pom.xml b/ambari-agent/pom.xml index 1b3c579..e05eba4 100644 --- a/ambari-agent/pom.xml +++ b/ambari-agent/pom.xml @@ -351,7 +351,15 @@ </source> </sources> </mapping> - <!-- --> + <mapping> + <!-- TODO: Remove when we introduce metadata downloading by agent--> + <directory>/var/lib/ambari-agent/cache/stacks</directory> + <sources> + <source> + <location>../ambari-server/src/main/resources/stacks</location> + </source> + </sources> + </mapping> </mappings> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/98adf594/ambari-agent/src/main/python/ambari_agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index a2ad9c5..2881997 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -47,8 +47,13 @@ class ActionQueue(threading.Thread): STATUS_COMMAND = 'STATUS_COMMAND' EXECUTION_COMMAND = 'EXECUTION_COMMAND' + ROLE_COMMAND_INSTALL = 'INSTALL' + ROLE_COMMAND_START = 'START' + ROLE_COMMAND_STOP = 'STOP' IN_PROGRESS_STATUS = 'IN_PROGRESS' + COMPLETED_STATUS = 'COMPLETED' + FAILED_STATUS = 'FAILED' def __init__(self, config, controller): super(ActionQueue, self).__init__() @@ -119,6 +124,7 @@ class ActionQueue(threading.Thread): 'status': self.IN_PROGRESS_STATUS }) self.commandStatuses.put_command_status(command, in_progress_status) + # TODO: Add CustomServiceOrchestrator call somewhere here # running command # Create a new instance of executor for the current thread puppetExecutor = PuppetExecutor.PuppetExecutor( @@ -128,10 +134,11 @@ class ActionQueue(threading.Thread): self.config.get('agent', 'prefix'), self.config) commandresult = puppetExecutor.runCommand(command, in_progress_status['tmpout'], in_progress_status['tmperr']) + # dumping results - status = "COMPLETED" + status = self.COMPLETED_STATUS if commandresult['exitcode'] != 0: - status = "FAILED" + status = self.FAILED_STATUS roleResult = self.commandStatuses.generate_report_template(command) # assume some puppet plumbing to run these commands roleResult.update({ @@ -146,13 +153,13 @@ class ActionQueue(threading.Thread): roleResult['stderr'] = 'None' # let ambari know that configuration tags were applied - if status == 'COMPLETED': + if status == self.COMPLETED_STATUS: configHandler = ActualConfigHandler(self.config) if command.has_key('configurationTags'): configHandler.write_actual(command['configurationTags']) roleResult['configurationTags'] = command['configurationTags'] - if command.has_key('roleCommand') and command['roleCommand'] == 'START': + if command.has_key('roleCommand') and command['roleCommand'] == self.ROLE_COMMAND_START: configHandler.copy_to_component(command['role']) roleResult['configurationTags'] = configHandler.read_actual_component(command['role']) self.commandStatuses.put_command_status(command, roleResult) http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/98adf594/ambari-agent/src/main/python/ambari_agent/AgentException.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/AgentException.py b/ambari-agent/src/main/python/ambari_agent/AgentException.py new file mode 100644 index 0000000..d7455f5 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/AgentException.py @@ -0,0 +1,24 @@ +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +class AgentException(Exception): + def __init__(self, value): + self.parameter = value + + def __str__(self): + return repr(self.parameter) http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/98adf594/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py index dd064d5..ed507ce 100644 --- a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py +++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py @@ -34,6 +34,7 @@ prefix=/tmp/ambari-agent data_cleanup_interval=86400 data_cleanup_max_age=2592000 ping_port=8670 +cache_dir=/var/lib/ambari-agent/cache [services] http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/98adf594/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py new file mode 100644 index 0000000..e6032cf --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py @@ -0,0 +1,132 @@ +#!/usr/bin/env python2.6 + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import logging +import os +import json, pprint +import sys + +from FileCache import FileCache +from AgentException import AgentException +from PythonExecutor import PythonExecutor +from AmbariConfig import AmbariConfig + + +logger = logging.getLogger() + +class CustomServiceOrchestrator(): + """ + Executes a command for custom service. stdout and stderr are written to + tmpoutfile and to tmperrfile respectively. + """ + + SCRIPT_TYPE_PYTHON = "PYTHON" + + def __init__(self, config): + self.config = config + self.tmp_dir = config.get('agent', 'prefix') + self.file_cache = FileCache(config) + self.python_executor = PythonExecutor(self.tmp_dir, config) + + + def runCommand(self, command, tmpoutfile, tmperrfile): + try: + # TODO: Adjust variables + service_name = command['serviceName'] + component_name = command['role'] + stack_name = command['stackName'] # TODO: add at the server side + stack_version = command['stackVersion'] # TODO: add at the server side + script_type = command['scriptType'] # TODO: add at the server side + script = command['script'] + command_name = command['roleCommand'] + timeout = int(command['timeout']) # TODO: add at the server side + base_dir = self.file_cache.get_service_base_dir( + stack_name, stack_version, service_name, component_name) + script_path = self.resolve_script_path(base_dir, script, script_type) + if script_type == self.SCRIPT_TYPE_PYTHON: + json_path = self.dump_command_to_json(command) + script_params = [command_name, json_path, base_dir] + ret = self.python_executor.run_file( + script_path, script_params, tmpoutfile, tmperrfile, timeout) + else: + message = "Unknown script type {0}".format(script_type) + raise AgentException(message) + except Exception: # We do not want to let agent fail completely + exc_type, exc_obj, exc_tb = sys.exc_info() + message = "Catched an exception while executing "\ + "custom service command: {0}: {1}".format(exc_type, exc_obj) + logger.error(message) + ret = { + 'stdout' : message, + 'stderr' : message, + 'exitCode': 1, + } + return ret + + + def resolve_script_path(self, base_dir, script, script_type): + """ + Incapsulates logic of script location determination. + """ + path = os.path.join(base_dir, "package", script) + if not os.path.exists(path): + message = "Script {0} does not exist".format(path) + raise AgentException(message) + return path + + + def dump_command_to_json(self, command): + """ + Converts command to json file and returns file path + """ + command_id = command['commandId'] + file_path = os.path.join(self.tmp_dir, "command-{0}.json".format(command_id)) + with open(file_path, "w") as f: + content = json.dumps(command) + f.write(content) + return file_path + + +def main(): + """ + May be used for manual testing if needed + """ + config = AmbariConfig().getConfig() + orchestrator = CustomServiceOrchestrator(config) + config.set('agent', 'prefix', "/tmp") + command = { + "serviceName" : "HBASE", + "role" : "HBASE_MASTER", + "stackName" : "HDP", + "stackVersion" : "1.2.0", + "scriptType" : "PYTHON", + "script" : "/tmp/1.py", + "roleCommand" : "START", + "timeout": 600 + } + + result = orchestrator.runCommand(command, "/tmp/out-1.txt", "/tmp/err-1.txt") + pprint.pprint(result) + pass + + + +if __name__ == "__main__": + main() http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/98adf594/ambari-agent/src/main/python/ambari_agent/FileCache.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/FileCache.py b/ambari-agent/src/main/python/ambari_agent/FileCache.py new file mode 100644 index 0000000..dbef708 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/FileCache.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python2.6 + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + + +# TODO: Update class description + + +import logging +import Queue +import threading +import pprint +import os +import json +from ServiceComponentMetadata import ServiceComponentMetadata +from AgentException import AgentException + +logger = logging.getLogger() + +class FileCache(): + """ + Provides caching and lookup for service metadata files. + If service metadata is not available at cache, + downloads relevant files from the server. + """ + + def __init__(self, config): + self.service_component_pool = {} + self.config = config + self.cache_dir = config.get('agent', 'cache_dir') + + def get_service_base_dir(self, stack_name, stack_version, service, component): + """ + Returns a base directory for service + """ + metadata_path = os.path.join(self.cache_dir, "stacks", str(stack_name), + str(stack_version), str(service)) + if not os.path.isdir(metadata_path): + # TODO: Metadata downloading will be implemented at Phase 2 + # As of now, all stack definitions are packaged and distributed with + # agent rpm + message = "Metadata dir for not found for a service " \ + "(stackName = {0}, stackVersion = {1}, " \ + "service = {2}, " \ + "component = {3}".format(stack_name, stack_version, + service, component) + raise AgentException(message) + return metadata_path + + http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/98adf594/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py index 566980e..a716c84 100644 --- a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py +++ b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py @@ -29,9 +29,11 @@ import shell logger = logging.getLogger() class PythonExecutor: - - # How many seconds will pass before running puppet is terminated on timeout - PYTHON_TIMEOUT_SECONDS = 600 + """ + Performs functionality for executing python scripts. + Warning: class maintains internal state. As a result, instances should not be + used as a singleton for a concurrent execution of python scripts + """ NO_ERROR = "none" grep = Grep() @@ -43,22 +45,25 @@ class PythonExecutor: self.config = config pass - def run_file(self, command, file, tmpoutfile, tmperrfile): + def run_file(self, script, script_params, tmpoutfile, tmperrfile, timeout): """ Executes the specified python file in a separate subprocess. Method returns only when the subprocess is finished. + Params arg is a list of script parameters + Timeout meaning: how many seconds should pass before script execution + is forcibly terminated """ tmpout = open(tmpoutfile, 'w') tmperr = open(tmperrfile, 'w') - pythonCommand = self.pythonCommand(file) + pythonCommand = self.pythonCommand(script, script_params) logger.info("Running command " + pprint.pformat(pythonCommand)) - process = self.lauch_python_subprocess(pythonCommand, tmpout, tmperr) + process = self.launch_python_subprocess(pythonCommand, tmpout, tmperr) logger.debug("Launching watchdog thread") self.event.clear() self.python_process_has_been_killed = False - thread = Thread(target = self.python_watchdog_func, args = (process, )) + thread = Thread(target = self.python_watchdog_func, args = (process, timeout)) thread.start() - # Waiting for process to finished or killed + # Waiting for the process to be either finished or killed process.communicate() self.event.set() thread.join() @@ -68,14 +73,14 @@ class PythonExecutor: out = open(tmpoutfile, 'r').read() error = open(tmperrfile, 'r').read() if self.python_process_has_been_killed: - error = str(error) + "\n Puppet has been killed due to timeout" + error = str(error) + "\n Python script has been killed due to timeout" returncode = 999 result = self.condenseOutput(out, error, returncode) logger.info("Result: %s" % result) return result - def lauch_python_subprocess(self, command, tmpout, tmperr): + def launch_python_subprocess(self, command, tmpout, tmperr): """ Creates subprocess with given parameters. This functionality was moved to separate method to make possible unit testing @@ -87,8 +92,8 @@ class PythonExecutor: def isSuccessfull(self, returncode): return not self.python_process_has_been_killed and returncode == 0 - def pythonCommand(self, file): - puppetcommand = ['python', file] + def pythonCommand(self, script, script_params): + puppetcommand = ['python', script] + script_params return puppetcommand def condenseOutput(self, stdout, stderr, retcode): @@ -100,8 +105,8 @@ class PythonExecutor: } return result - def python_watchdog_func(self, python): - self.event.wait(self.PYTHON_TIMEOUT_SECONDS) + def python_watchdog_func(self, python, timeout): + self.event.wait(timeout) if python.returncode is None: logger.error("Subprocess timed out and will be killed") shell.kill_process_with_children(python.pid) http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/98adf594/ambari-agent/src/main/python/resource_management/script.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/resource_management/script.py b/ambari-agent/src/main/python/resource_management/script.py new file mode 100644 index 0000000..d64010a --- /dev/null +++ b/ambari-agent/src/main/python/resource_management/script.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python2.6 + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import sys +import json +import logging + +from resource_management.environment import Environment +from resource_management.exceptions import Fail + + +class Script(): + """ + Executes a command for custom service. stdout and stderr are written to + tmpoutfile and to tmperrfile respectively. + """ + + def __init__(self): + pass + + + def start(self, env, params): # TODO: just for test runs; remove + env.set_prefixes("ddd") + print "Start!" + pass + + + def execute(self): + """ + Sets up logging; + Parses command parameters and executes method relevant to command type + """ + # set up logging (two separate loggers for stderr and stdout with different loglevels) + logger = logging.getLogger('resource_management') + logger.setLevel(logging.DEBUG) + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + chout = logging.StreamHandler(sys.stdout) + chout.setLevel(logging.DEBUG) + chout.setFormatter(formatter) + cherr = logging.StreamHandler(sys.stderr) + cherr.setLevel(logging.ERROR) + cherr.setFormatter(formatter) + logger.addHandler(cherr) + # parse arguments + if len(sys.argv) < 1+3: + logger.error("Script expects at least 3 arguments") + sys.exit(1) + command_type = str.lower(sys.argv[1]) + # parse command parameters + command_data_file = sys.argv[2] + basedir = sys.argv[3] + try: + with open(command_data_file, "r") as f: + pass + params = json.load(f) + except IOError: + logger.exception("Can not read json file with command parameters: ") + sys.exit(1) + # Run class method mentioned by a command type + self_methods = dir(self) + if not command_type in self_methods: + logger.error("Script {0} has not method '{1}'".format(sys.argv[0], command_type)) + sys.exit(1) + method = getattr(self, command_type) + try: + with Environment(basedir, params) as env: + method(env, params) + env.run() + except Fail: + logger.exception("Got exception while executing method '{0}':".format(command_type)) + sys.exit(1) + + + + def fail_with_error(self, message): + """ + Prints error message and exits with non-zero exit code + """ + print("Error: " + message) + sys.stderr.write("Error: " + message) + sys.exit(1) + + +if __name__ == "__main__": + Script().execute() http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/98adf594/ambari-agent/src/test/python/TestCustomServiceOrchestrator.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/TestCustomServiceOrchestrator.py b/ambari-agent/src/test/python/TestCustomServiceOrchestrator.py new file mode 100644 index 0000000..e4a430d --- /dev/null +++ b/ambari-agent/src/test/python/TestCustomServiceOrchestrator.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python2.6 + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' +import ConfigParser + +import pprint + +from unittest import TestCase +import threading +import tempfile +import time +from threading import Thread + +from PythonExecutor import PythonExecutor +from AmbariConfig import AmbariConfig +from mock.mock import MagicMock, patch +import StringIO +import sys + + +class TestCustomServiceOrchestrator(TestCase): + + def setUp(self): + # disable stdout + out = StringIO.StringIO() + sys.stdout = out + # generate sample config + tmpdir = tempfile.gettempdir() + config = ConfigParser.RawConfigParser() + config.add_section('agent') + config.set('agent', 'prefix', tmpdir) + + + def tearDown(self): + # enable stdout + sys.stdout = sys.__stdout__ + + http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/98adf594/ambari-agent/src/test/python/TestPythonExecutor.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/TestPythonExecutor.py b/ambari-agent/src/test/python/TestPythonExecutor.py index 69dc349..c27c0f5 100644 --- a/ambari-agent/src/test/python/TestPythonExecutor.py +++ b/ambari-agent/src/test/python/TestPythonExecutor.py @@ -42,20 +42,20 @@ class TestPythonExecutor(TestCase): executor = PythonExecutor("/tmp", AmbariConfig().getConfig()) _, tmpoutfile = tempfile.mkstemp() _, tmperrfile = tempfile.mkstemp() - executor.PYTHON_TIMEOUT_SECONDS = 0.1 + PYTHON_TIMEOUT_SECONDS = 0.1 kill_process_with_children_mock.side_effect = lambda pid : subproc_mock.terminate() - def lauch_python_subprocess_method(command, tmpout, tmperr): + def launch_python_subprocess_method(command, tmpout, tmperr): subproc_mock.tmpout = tmpout subproc_mock.tmperr = tmperr return subproc_mock - executor.lauch_python_subprocess = lauch_python_subprocess_method + executor.launch_python_subprocess = launch_python_subprocess_method runShellKillPgrp_method = MagicMock() runShellKillPgrp_method.side_effect = lambda python : python.terminate() executor.runShellKillPgrp = runShellKillPgrp_method subproc_mock.returncode = None - thread = Thread(target = executor.run_file, args = ("fake_command", - "fake_puppetFile", tmpoutfile, tmperrfile)) + thread = Thread(target = executor.run_file, args = ("fake_puppetFile", ["arg1", "arg2"], + tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS)) thread.start() time.sleep(0.1) subproc_mock.finished_event.wait() @@ -70,19 +70,19 @@ class TestPythonExecutor(TestCase): executor = PythonExecutor("/tmp", AmbariConfig().getConfig()) _, tmpoutfile = tempfile.mkstemp() _, tmperrfile = tempfile.mkstemp() - executor.PYTHON_TIMEOUT_SECONDS = 5 + PYTHON_TIMEOUT_SECONDS = 5 - def lauch_python_subprocess_method(command, tmpout, tmperr): + def launch_python_subprocess_method(command, tmpout, tmperr): subproc_mock.tmpout = tmpout subproc_mock.tmperr = tmperr return subproc_mock - executor.lauch_python_subprocess = lauch_python_subprocess_method + executor.launch_python_subprocess = launch_python_subprocess_method runShellKillPgrp_method = MagicMock() runShellKillPgrp_method.side_effect = lambda python : python.terminate() executor.runShellKillPgrp = runShellKillPgrp_method subproc_mock.returncode = 0 - thread = Thread(target = executor.run_file, args = ("fake_command", - "fake_puppetFile", tmpoutfile, tmperrfile)) + thread = Thread(target = executor.run_file, args = ("fake_puppetFile", ["arg1", "arg2"], + tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS)) thread.start() time.sleep(0.1) subproc_mock.should_finish_event.set() @@ -96,19 +96,19 @@ class TestPythonExecutor(TestCase): executor = PythonExecutor("/tmp", AmbariConfig().getConfig()) _, tmpoutfile = tempfile.mkstemp() _, tmperrfile = tempfile.mkstemp() - executor.PYTHON_TIMEOUT_SECONDS = 5 + PYTHON_TIMEOUT_SECONDS = 5 - def lauch_python_subprocess_method(command, tmpout, tmperr): + def launch_python_subprocess_method(command, tmpout, tmperr): subproc_mock.tmpout = tmpout subproc_mock.tmperr = tmperr return subproc_mock - executor.lauch_python_subprocess = lauch_python_subprocess_method + executor.launch_python_subprocess = launch_python_subprocess_method runShellKillPgrp_method = MagicMock() runShellKillPgrp_method.side_effect = lambda python : python.terminate() executor.runShellKillPgrp = runShellKillPgrp_method subproc_mock.returncode = 0 subproc_mock.should_finish_event.set() - result = executor.run_file("command", "file", tmpoutfile, tmperrfile) + result = executor.run_file("file", ["arg1", "arg2"], tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS) self.assertEquals(result, {'exitcode': 0, 'stderr': 'Dummy err', 'stdout': 'Dummy output'})
