AMBARI-5934. Provide ability to rebalance HDFS.
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/cb662f49 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/cb662f49 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/cb662f49 Branch: refs/heads/trunk Commit: cb662f494f27b0ffc2058151f550b0cb08572db1 Parents: 70588f5 Author: Mahadev Konar <[email protected]> Authored: Fri Aug 8 10:10:17 2014 -0700 Committer: Mahadev Konar <[email protected]> Committed: Fri Aug 8 10:10:21 2014 -0700 ---------------------------------------------------------------------- .gitignore | 1 + .../src/main/python/ambari_agent/ActionQueue.py | 84 +- .../BackgroundCommandExecutionHandle.py | 44 + .../python/ambari_agent/CommandStatusDict.py | 23 +- .../ambari_agent/CustomServiceOrchestrator.py | 59 +- .../main/python/ambari_agent/PythonExecutor.py | 128 ++- .../test/python/ambari_agent/TestActionQueue.py | 215 +++- .../TestCustomServiceOrchestrator.py | 34 + ambari-server/pom.xml | 2 + .../server/actionmanager/ActionScheduler.java | 66 +- .../ambari/server/agent/AgentCommand.java | 1 + .../ambari/server/agent/HeartBeatHandler.java | 1 + .../server/api/services/AmbariMetaInfo.java | 23 +- .../server/api/util/StackExtensionHelper.java | 14 +- .../AmbariCustomCommandExecutionHelper.java | 14 +- .../ambari/server/state/ComponentInfo.java | 8 + .../ambari/server/state/ConfigHelper.java | 11 +- .../server/state/CustomCommandDefinition.java | 7 +- .../apache/ambari/server/state/ServiceInfo.java | 14 +- .../system_action_definitions.xml | 20 +- .../custom_actions/ambari_hdfs_rebalancer.py | 59 - .../custom_actions/cancel_background_task.py | 41 + .../stacks/HDP/2.0.6/services/HDFS/metainfo.xml | 8 + .../scripts/balancer-emulator/balancer-err.log | 1032 ++++++++++++++++++ .../scripts/balancer-emulator/balancer.log | 29 + .../scripts/balancer-emulator/hdfs-command.py | 45 + .../HDFS/package/scripts/hdfs_rebalance.py | 130 +++ .../services/HDFS/package/scripts/namenode.py | 52 + .../services/HDFS/package/scripts/params.py | 1 + .../actionmanager/TestActionScheduler.java | 83 ++ .../server/api/services/AmbariMetaInfoTest.java | 11 +- .../AmbariManagementControllerTest.java | 3 +- .../BackgroundCustomCommandExecutionTest.java | 275 +++++ .../ActionDefinitionManagerTest.java | 18 +- .../python/stacks/2.0.6/HDFS/test_namenode.py | 7 + .../2.0.6/configs/rebalancehdfs_default.json | 388 +++++++ .../cust_action_definitions1.xml | 10 - .../system_action_definitions.xml | 32 - .../stacks/HDP/2.0.5/services/HDFS/metainfo.xml | 9 + .../global/background_operations_controller.js | 6 + ambari-web/app/controllers/main/service/item.js | 75 +- ambari-web/app/messages.js | 8 + .../templates/common/host_progress_popup.hbs | 19 +- .../app/templates/common/prompt_popup.hbs | 5 +- ambari-web/app/utils/ajax/ajax.js | 50 + ambari-web/app/utils/helper.js | 3 + ambari-web/app/utils/host_progress_popup.js | 37 +- ambari-web/app/views/common/modal_popup.js | 2 + ambari-web/app/views/main/service/item.js | 11 +- 49 files changed, 2942 insertions(+), 276 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 0430303..cff27da 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,4 @@ derby.log pass.txt ambari-agent/src/test/python/ambari_agent/dummy_files/current-stack velocity.log* +*.pydevproject http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-agent/src/main/python/ambari_agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index d3aad6e..6437036 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -26,11 +26,13 @@ import pprint import os import json +from AgentException import AgentException from LiveStatus import LiveStatus from shell import shellRunner from ActualConfigHandler import ActualConfigHandler from CommandStatusDict import CommandStatusDict from CustomServiceOrchestrator import CustomServiceOrchestrator +from ambari_agent.BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle logger = logging.getLogger() @@ -52,9 +54,12 @@ class ActionQueue(threading.Thread): STATUS_COMMAND = 'STATUS_COMMAND' EXECUTION_COMMAND = 'EXECUTION_COMMAND' + BACKGROUND_EXECUTION_COMMAND = 'BACKGROUND_EXECUTION_COMMAND' + CANCEL_BACKGROUND_EXECUTION_COMMAND = 'CANCEL_BACKGROUND_EXECUTION_COMMAND' ROLE_COMMAND_INSTALL = 'INSTALL' ROLE_COMMAND_START = 'START' ROLE_COMMAND_STOP = 'STOP' + ROLE_COMMAND_CANCEL = 'CANCEL' ROLE_COMMAND_CUSTOM_COMMAND = 'CUSTOM_COMMAND' CUSTOM_COMMAND_RESTART = 'RESTART' @@ -66,6 +71,7 @@ class ActionQueue(threading.Thread): super(ActionQueue, self).__init__() self.commandQueue = Queue.Queue() self.statusCommandQueue = Queue.Queue() + self.backgroundCommandQueue = Queue.Queue() self.commandStatuses = CommandStatusDict(callback_action = self.status_update_callback) self.config = config @@ -74,8 +80,7 @@ class ActionQueue(threading.Thread): self.configTags = {} self._stop = threading.Event() self.tmpdir = config.get('agent', 'prefix') - self.customServiceOrchestrator = CustomServiceOrchestrator(config, - controller) + self.customServiceOrchestrator = CustomServiceOrchestrator(config, controller, self.commandStatuses) def stop(self): @@ -106,7 +111,10 @@ class ActionQueue(threading.Thread): command['serviceName'] + " of cluster " + \ command['clusterName'] + " to the queue.") logger.debug(pprint.pformat(command)) - self.commandQueue.put(command) + if command['commandType'] == self.BACKGROUND_EXECUTION_COMMAND : + self.backgroundCommandQueue.put(self.createCommandHandle(command)) + else: + self.commandQueue.put(command) def cancel(self, commands): for command in commands: @@ -136,25 +144,45 @@ class ActionQueue(threading.Thread): def run(self): while not self.stopped(): - while not self.statusCommandQueue.empty(): - try: - command = self.statusCommandQueue.get(False) - self.process_command(command) - except (Queue.Empty): - pass + self.processBackgroundQueueSafeEmpty(); + self.processStatusCommandQueueSafeEmpty(); try: command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME) self.process_command(command) except (Queue.Empty): pass + def processBackgroundQueueSafeEmpty(self): + while not self.backgroundCommandQueue.empty(): + try: + command = self.backgroundCommandQueue.get(False) + if(command.has_key('__handle') and command['__handle'].status == None): + self.process_command(command) + except (Queue.Empty): + pass + + def processStatusCommandQueueSafeEmpty(self): + while not self.statusCommandQueue.empty(): + try: + command = self.statusCommandQueue.get(False) + self.process_command(command) + except (Queue.Empty): + pass + + + def createCommandHandle(self, command): + if(command.has_key('__handle')): + raise AgentException("Command already has __handle") + command['__handle'] = BackgroundCommandExecutionHandle(command, command['commandId'], self.on_background_command_started, self.on_background_command_complete_callback) + return command def process_command(self, command): logger.debug("Took an element of Queue: " + pprint.pformat(command)) # make sure we log failures + commandType = command['commandType'] try: - if command['commandType'] == self.EXECUTION_COMMAND: + if commandType in [self.EXECUTION_COMMAND, self.BACKGROUND_EXECUTION_COMMAND]: self.execute_command(command) - elif command['commandType'] == self.STATUS_COMMAND: + elif commandType == self.STATUS_COMMAND: self.execute_status_command(command) else: logger.error("Unrecognized command " + pprint.pformat(command)) @@ -165,11 +193,11 @@ class ActionQueue(threading.Thread): def execute_command(self, command): ''' - Executes commands of type EXECUTION_COMMAND + Executes commands of type EXECUTION_COMMAND ''' clusterName = command['clusterName'] commandId = command['commandId'] - + isCommandBackground = command['commandType'] == self.BACKGROUND_EXECUTION_COMMAND message = "Executing command with id = {commandId} for role = {role} of " \ "cluster {cluster}.".format( commandId = str(commandId), role=command['role'], @@ -189,13 +217,17 @@ class ActionQueue(threading.Thread): 'status': self.IN_PROGRESS_STATUS }) self.commandStatuses.put_command_status(command, in_progress_status) + # running command commandresult = self.customServiceOrchestrator.runCommand(command, in_progress_status['tmpout'], in_progress_status['tmperr']) + + # dumping results - status = self.COMPLETED_STATUS - if commandresult['exitcode'] != 0: - status = self.FAILED_STATUS + if isCommandBackground: + return + else: + status = self.COMPLETED_STATUS if commandresult['exitcode'] == 0 else self.FAILED_STATUS roleResult = self.commandStatuses.generate_report_template(command) roleResult.update({ 'stdout': commandresult['stdout'], @@ -249,6 +281,26 @@ class ActionQueue(threading.Thread): self.commandStatuses.put_command_status(command, roleResult) + def on_background_command_started(self, handle): + #update command with given handle + self.commandStatuses.update_command_status(handle.command, {'pid' : handle.pid}) + + + def on_background_command_complete_callback(self, process_condenced_result, handle): + logger.debug('Start callback: %s' % process_condenced_result) + logger.debug('The handle is: %s' % handle) + status = self.COMPLETED_STATUS if handle.exitCode == 0 else self.FAILED_STATUS + roleResult = self.commandStatuses.generate_report_template(handle.command) + + roleResult.update({ + 'stdout': process_condenced_result['stdout'], + 'stderr': process_condenced_result['stderr'], + 'exitCode': process_condenced_result['exitcode'], + 'structuredOut': str(json.dumps(process_condenced_result['structuredOut'])) if 'structuredOut' in process_condenced_result else '', + 'status': status, + }) + + self.commandStatuses.put_command_status(handle.command, roleResult) def execute_status_command(self, command): ''' http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-agent/src/main/python/ambari_agent/BackgroundCommandExecutionHandle.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/BackgroundCommandExecutionHandle.py b/ambari-agent/src/main/python/ambari_agent/BackgroundCommandExecutionHandle.py new file mode 100644 index 0000000..17b7ce5 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/BackgroundCommandExecutionHandle.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' +import logging + +logger = logging.getLogger() +installScriptHash = -1 + +class BackgroundCommandExecutionHandle: + + SCHEDULED_STATUS = 'SCHEDULED' + RUNNING_STATUS = 'RUNNING' + STOP_REQUEST_STATUS = 'STOP_REQUEST' + STOPPED_STATUS = 'SCHEDULED' + + def __init__(self, command, commandId, on_background_command_started, on_background_command_complete_callback): + self.command = command + self.pid = 0 + self.status = None + self.exitCode = None + self.commandId = commandId + self.on_background_command_started = on_background_command_started + self.on_background_command_complete_callback = on_background_command_complete_callback + + + + def __str__(self): + return "[BackgroundHandle: pid='{0}', status='{1}', exitCode='{2}', commandId='{3}']".format(self.pid, self.status, self.exitCode, self.commandId) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py index f00ada2..0ebc45e 100644 --- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py +++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py @@ -21,6 +21,7 @@ limitations under the License. import json import logging import threading +import copy from Grep import Grep logger = logging.getLogger() @@ -58,7 +59,25 @@ class CommandStatusDict(): if not status_command: self.callback_action() - + def update_command_status(self, command, delta): + """ + Updates status of command without replacing (overwrites with delta value) + """ + if 'taskId' in command: + key = command['taskId'] + status_command = False + else: # Status command reports has no task id + key = id(command) + status_command = True + with self.lock: # Synchronized + self.current_state[key][1].update(delta) + if not status_command: + self.callback_action() + + def get_command_status(self, taskId): + with self.lock: + c = copy.copy(self.current_state[taskId][1]) + return c def generate_report(self): """ Generates status reports about commands that are IN_PROGRESS, COMPLETE or @@ -72,7 +91,7 @@ class CommandStatusDict(): for key, item in self.current_state.items(): command = item[0] report = item[1] - if command ['commandType'] == ActionQueue.EXECUTION_COMMAND: + if command ['commandType'] in [ActionQueue.EXECUTION_COMMAND, ActionQueue.BACKGROUND_EXECUTION_COMMAND]: if (report['status']) != ActionQueue.IN_PROGRESS_STATUS: resultReports.append(report) # Removing complete/failed command status from dict http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py index e13e543..093fc22 100644 --- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py +++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py @@ -50,7 +50,7 @@ class CustomServiceOrchestrator(): PING_PORTS_KEY = "all_ping_ports" AMBARI_SERVER_HOST = "ambari_server_host" - def __init__(self, config, controller): + def __init__(self, config, controller, commandStatuses = None): self.config = config self.tmp_dir = config.get('agent', 'prefix') self.exec_tmp_dir = config.get('agent', 'tmp_dir') @@ -63,6 +63,8 @@ class CustomServiceOrchestrator(): self.public_fqdn = hostname.public_hostname(config) # cache reset will be called on every agent registration controller.registration_listeners.append(self.file_cache.reset) + + self.commandStatuses = commandStatuses # Clean up old status command files if any try: os.unlink(self.status_commands_stdout) @@ -93,6 +95,8 @@ class CustomServiceOrchestrator(): script_type = command['commandParams']['script_type'] script = command['commandParams']['script'] timeout = int(command['commandParams']['command_timeout']) + before_interceptor_method = command['commandParams']['before_system_hook_function'] if command['commandParams'].has_key('before_system_hook_function') else None + if 'hostLevelParams' in command and 'jdk_location' in command['hostLevelParams']: server_url_prefix = command['hostLevelParams']['jdk_location'] else: @@ -110,6 +114,12 @@ class CustomServiceOrchestrator(): if command_name == self.CUSTOM_ACTION_COMMAND: base_dir = self.file_cache.get_custom_actions_base_dir(server_url_prefix) script_tuple = (os.path.join(base_dir, script) , base_dir) + + # Call systemHook functions in current virtual machine. This function can enrich custom action + # command with some information from current machine. And can be considered as plugin + if before_interceptor_method != None: + self.processSystemHookFunctions(script_tuple, before_interceptor_method, command) + hook_dir = None else: if command_name == self.CUSTOM_COMMAND_COMMAND: @@ -127,6 +137,11 @@ class CustomServiceOrchestrator(): message = "Unknown script type {0}".format(script_type) raise AgentException(message) # Execute command using proper interpreter + handle = None + if(command.has_key('__handle')): + handle = command['__handle'] + del command['__handle'] + json_path = self.dump_command_to_json(command) pre_hook_tuple = self.resolve_hook_script_path(hook_dir, self.PRE_HOOK_PREFIX, command_name, script_type) @@ -141,12 +156,16 @@ class CustomServiceOrchestrator(): # Executing hooks and script ret = None + from ActionQueue import ActionQueue + if(command.has_key('commandType') and command['commandType'] == ActionQueue.BACKGROUND_EXECUTION_COMMAND and len(filtered_py_file_list) > 1): + raise AgentException("Background commands are supported without hooks only") + for py_file, current_base_dir in filtered_py_file_list: script_params = [command_name, json_path, current_base_dir] ret = self.python_executor.run_file(py_file, script_params, self.exec_tmp_dir, tmpoutfile, tmperrfile, timeout, tmpstrucoutfile, logger_level, self.map_task_to_process, - task_id, override_output_files) + task_id, override_output_files, handle = handle) # Next run_file() invocations should always append to current output override_output_files = False if ret['exitcode'] != 0: @@ -156,16 +175,17 @@ class CustomServiceOrchestrator(): raise AgentException("No script has been executed") # if canceled - pid = self.commands_in_progress.pop(task_id) - if not isinstance(pid, int): - reason = '\nCommand aborted. ' + pid - ret['stdout'] += reason - ret['stderr'] += reason - - with open(tmpoutfile, "a") as f: - f.write(reason) - with open(tmperrfile, "a") as f: - f.write(reason) + if self.commands_in_progress.has_key(task_id):#Background command do not push in this collection (TODO) + pid = self.commands_in_progress.pop(task_id) + if not isinstance(pid, int): + reason = '\nCommand aborted. ' + pid + ret['stdout'] += reason + ret['stderr'] += reason + + with open(tmpoutfile, "a") as f: + f.write(reason) + with open(tmperrfile, "a") as f: + f.write(reason) except Exception: # We do not want to let agent fail completely exc_type, exc_obj, exc_tb = sys.exc_info() @@ -180,7 +200,20 @@ class CustomServiceOrchestrator(): } return ret - + def fetch_bg_pid_by_taskid(self,command): + cancel_command_pid = None + try: + cancelTaskId = int(command['commandParams']['cancel_task_id']) + status = self.commandStatuses.get_command_status(cancelTaskId) + cancel_command_pid = status['pid'] + except Exception: + pass + logger.info("Found PID=%s for cancel taskId=%s" % (cancel_command_pid,cancelTaskId)) + command['commandParams']['cancel_command_pid'] = cancel_command_pid + + def processSystemHookFunctions(self, script_tuple, before_interceptor_method, command): + getattr(self, before_interceptor_method)(command) + def requestComponentStatus(self, command): """ Component status is determined by exit code, returned by runCommand(). http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py index 704e8f3..d130497 100644 --- a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py +++ b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py @@ -24,6 +24,9 @@ import subprocess import pprint import threading from threading import Thread +import time +from BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle + from Grep import Grep import shell, sys @@ -36,7 +39,6 @@ class PythonExecutor: Warning: class maintains internal state. As a result, instances should not be used as a singleton for a concurrent execution of python scripts """ - NO_ERROR = "none" grep = Grep() event = threading.Event() @@ -47,9 +49,19 @@ class PythonExecutor: self.config = config pass + + def open_subporcess_files(self, tmpoutfile, tmperrfile, override_output_files): + if override_output_files: # Recreate files + tmpout = open(tmpoutfile, 'w') + tmperr = open(tmperrfile, 'w') + else: # Append to files + tmpout = open(tmpoutfile, 'a') + tmperr = open(tmperrfile, 'a') + return tmpout, tmperr + def run_file(self, script, script_params, tmp_dir, tmpoutfile, tmperrfile, timeout, tmpstructedoutfile, logger_level, callback, task_id, - override_output_files = True): + override_output_files = True, handle = None): """ Executes the specified python file in a separate subprocess. Method returns only when the subprocess is finished. @@ -59,13 +71,6 @@ class PythonExecutor: override_output_files option defines whether stdout/stderr files will be recreated or appended """ - if override_output_files: # Recreate files - tmpout = open(tmpoutfile, 'w') - tmperr = open(tmperrfile, 'w') - else: # Append to files - tmpout = open(tmpoutfile, 'a') - tmperr = open(tmperrfile, 'a') - # need to remove this file for the following case: # status call 1 does not write to file; call 2 writes to file; # call 3 does not write to file, so contents are still call 2's result @@ -77,45 +82,58 @@ class PythonExecutor: script_params += [tmpstructedoutfile, logger_level, tmp_dir] pythonCommand = self.python_command(script, script_params) logger.info("Running command " + pprint.pformat(pythonCommand)) - process = self.launch_python_subprocess(pythonCommand, tmpout, tmperr) - # map task_id to pid - callback(task_id, process.pid) - logger.debug("Launching watchdog thread") - self.event.clear() - self.python_process_has_been_killed = False - thread = Thread(target = self.python_watchdog_func, args = (process, timeout)) - thread.start() - # Waiting for the process to be either finished or killed - process.communicate() - self.event.set() - thread.join() + if(handle == None) : + tmpout, tmperr = self.open_subporcess_files(tmpoutfile, tmperrfile, override_output_files) + + process = self.launch_python_subprocess(pythonCommand, tmpout, tmperr) + # map task_id to pid + callback(task_id, process.pid) + logger.debug("Launching watchdog thread") + self.event.clear() + self.python_process_has_been_killed = False + thread = Thread(target = self.python_watchdog_func, args = (process, timeout)) + thread.start() + # Waiting for the process to be either finished or killed + process.communicate() + self.event.set() + thread.join() + return self.prepare_process_result(process, tmpoutfile, tmperrfile, tmpstructedoutfile) + else: + holder = Holder(pythonCommand, tmpoutfile, tmperrfile, tmpstructedoutfile, handle) + + background = BackgroundThread(holder, self) + background.start() + return {"exitcode": 777} + + def prepare_process_result (self, process, tmpoutfile, tmperrfile, tmpstructedoutfile): + out, error, structured_out = self.read_result_from_files(tmpoutfile, tmperrfile, tmpstructedoutfile) # Building results - error = self.NO_ERROR returncode = process.returncode - out = open(tmpoutfile, 'r').read() - error = open(tmperrfile, 'r').read() + if self.python_process_has_been_killed: + error = str(error) + "\n Python script has been killed due to timeout" + returncode = 999 + result = self.condenseOutput(out, error, returncode, structured_out) + logger.info("Result: %s" % result) + return result + + def read_result_from_files(self, out_path, err_path, structured_out_path): + out = open(out_path, 'r').read() + error = open(err_path, 'r').read() try: - with open(tmpstructedoutfile, 'r') as fp: + with open(structured_out_path, 'r') as fp: structured_out = json.load(fp) except Exception: - if os.path.exists(tmpstructedoutfile): - errMsg = 'Unable to read structured output from ' + tmpstructedoutfile + if os.path.exists(structured_out_path): + errMsg = 'Unable to read structured output from ' + structured_out_path structured_out = { 'msg' : errMsg } logger.warn(structured_out) else: structured_out = {} - - if self.python_process_has_been_killed: - error = str(error) + "\n Python script has been killed due to timeout" - returncode = 999 - result = self.condenseOutput(out, error, returncode, structured_out) - logger.info("Result: %s" % result) - return result - - + return out, error, structured_out + def launch_python_subprocess(self, command, tmpout, tmperr): """ Creates subprocess with given parameters. This functionality was moved to separate method @@ -124,7 +142,7 @@ class PythonExecutor: return subprocess.Popen(command, stdout=tmpout, stderr=tmperr, close_fds=True) - + def isSuccessfull(self, returncode): return not self.python_process_has_been_killed and returncode == 0 @@ -153,3 +171,39 @@ class PythonExecutor: shell.kill_process_with_children(python.pid) self.python_process_has_been_killed = True pass + +class Holder: + def __init__(self, command, out_file, err_file, structured_out_file, handle): + self.command = command + self.out_file = out_file + self.err_file = err_file + self.structured_out_file = structured_out_file + self.handle = handle + +class BackgroundThread(threading.Thread): + def __init__(self, holder, pythonExecutor): + threading.Thread.__init__(self) + self.holder = holder + self.pythonExecutor = pythonExecutor + + def run(self): + process_out, process_err = self.pythonExecutor.open_subporcess_files(self.holder.out_file, self.holder.err_file, True) + + logger.info("Starting process command %s" % self.holder.command) + process = self.pythonExecutor.launch_python_subprocess(self.holder.command, process_out, process_err) + + logger.info("Process has been started. Pid = %s" % process.pid) + + self.holder.handle.pid = process.pid + self.holder.handle.status = BackgroundCommandExecutionHandle.RUNNING_STATUS + self.holder.handle.on_background_command_started(self.holder.handle) + + process.communicate() + + self.holder.handle.exitCode = process.returncode + process_condenced_result = self.pythonExecutor.prepare_process_result(process, self.holder.out_file, self.holder.err_file, self.holder.structured_out_file) + logger.info("Calling callback with args %s" % process_condenced_result) + self.holder.handle.on_background_command_complete_callback(process_condenced_result, self.holder.handle) + logger.info("Exiting from thread for holder pid %s" % self.holder.handle.pid) + + http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py index e06efe4..4447670 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py +++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py @@ -27,6 +27,7 @@ import os, errno, time, pprint, tempfile, threading, json import StringIO import sys from threading import Thread +import copy from mock.mock import patch, MagicMock, call from ambari_agent.StackVersionsFileHandler import StackVersionsFileHandler @@ -34,13 +35,11 @@ from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator from ambari_agent.PythonExecutor import PythonExecutor from ambari_agent.CommandStatusDict import CommandStatusDict from ambari_agent.ActualConfigHandler import ActualConfigHandler +from FileCache import FileCache class TestActionQueue(TestCase): - def setUp(self): - out = StringIO.StringIO() - sys.stdout = out # save original open() method for later use self.original_open = open @@ -155,6 +154,49 @@ class TestActionQueue(TestCase): 'hostLevelParams': {} } + background_command = { + 'commandType': 'BACKGROUND_EXECUTION_COMMAND', + 'role': 'NAMENODE', + 'roleCommand': 'CUSTOM_COMMAND', + 'commandId': '1-1', + 'taskId': 19, + 'clusterName': 'c1', + 'serviceName': 'HDFS', + 'configurations':{'global' : {}}, + 'configurationTags':{'global' : { 'tag': 'v123' }}, + 'hostLevelParams':{'custom_command': 'REBALANCE_HDFS'}, + 'commandParams' : { + 'script_type' : 'PYTHON', + 'script' : 'script.py', + 'command_timeout' : '600', + 'jdk_location' : '.', + 'service_package_folder' : '.' + } + } + cancel_background_command = { + 'commandType': 'EXECUTION_COMMAND', + 'role': 'NAMENODE', + 'roleCommand': 'ACTIONEXECUTE', + 'commandId': '1-1', + 'taskId': 20, + 'clusterName': 'c1', + 'serviceName': 'HDFS', + 'configurations':{'global' : {}}, + 'configurationTags':{'global' : {}}, + 'hostLevelParams':{}, + 'commandParams' : { + 'script_type' : 'PYTHON', + 'script' : 'cancel_background_task.py', + 'before_system_hook_function' : 'fetch_bg_pid_by_taskid', + 'jdk_location' : '.', + 'command_timeout' : '600', + 'service_package_folder' : '.', + 'cancel_policy': 'SIGKILL', + 'cancel_task_id': "19", + } + } + + @patch.object(ActionQueue, "process_command") @patch.object(Queue, "get") @patch.object(CustomServiceOrchestrator, "__init__") @@ -526,3 +568,170 @@ class TestActionQueue(TestCase): actionQueue.join() self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.') + @patch.object(StackVersionsFileHandler, "read_stack_version") + @patch.object(CustomServiceOrchestrator, "runCommand") + @patch.object(CustomServiceOrchestrator, "__init__") + def test_execute_background_command(self, CustomServiceOrchestrator_mock, + runCommand_mock, read_stack_version_mock + ): + CustomServiceOrchestrator_mock.return_value = None + CustomServiceOrchestrator.runCommand.return_value = {'exitcode' : 0, + 'stdout': 'out-11', + 'stderr' : 'err-13'} + + dummy_controller = MagicMock() + actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller) + + execute_command = copy.deepcopy(self.background_command) + actionQueue.put([execute_command]) + actionQueue.processBackgroundQueueSafeEmpty(); + actionQueue.processStatusCommandQueueSafeEmpty(); + + #assert that python execturor start + self.assertTrue(runCommand_mock.called) + runningCommand = actionQueue.commandStatuses.current_state.get(execute_command['taskId']) + self.assertTrue(runningCommand is not None) + self.assertEqual(runningCommand[1]['status'], ActionQueue.IN_PROGRESS_STATUS) + + report = actionQueue.result() + self.assertEqual(len(report['reports']),1) + + + + @patch.object(StackVersionsFileHandler, "read_stack_version") + @patch.object(CustomServiceOrchestrator, "resolve_script_path") + @patch.object(FileCache, "__init__") + def test_execute_python_executor(self, read_stack_version_mock, FileCache_mock, resolve_script_path_mock): + FileCache_mock.return_value = None + + + dummy_controller = MagicMock() + cfg = AmbariConfig().getConfig() + cfg.set('agent', 'tolerate_download_failures', 'true') + cfg.set('agent', 'prefix', '.') + cfg.set('agent', 'cache_dir', 'background_tasks') + + actionQueue = ActionQueue(cfg, dummy_controller) + patch_output_file(actionQueue.customServiceOrchestrator.python_executor) + actionQueue.customServiceOrchestrator.dump_command_to_json = MagicMock() + + result = {} + lock = threading.RLock() + complete_done = threading.Condition(lock) + start_done = threading.Condition(lock) + + def command_started_w(handle): + with lock: + result['command_started'] = {'handle': copy.copy(handle), 'command_status' : actionQueue.commandStatuses.get_command_status(handle.command['taskId'])} + start_done.notifyAll() + + def command_complete_w(process_condenced_result, handle): + with lock: + result['command_complete'] = {'condenced_result' : copy.copy(process_condenced_result), + 'handle' : copy.copy(handle), + 'command_status' : actionQueue.commandStatuses.get_command_status(handle.command['taskId']) + } + complete_done.notifyAll() + + actionQueue.on_background_command_complete_callback = wraped(actionQueue.on_background_command_complete_callback,None, command_complete_w) + actionQueue.on_background_command_started = wraped(actionQueue.on_background_command_started,None,command_started_w) + actionQueue.put([self.background_command]) + actionQueue.processBackgroundQueueSafeEmpty(); + actionQueue.processStatusCommandQueueSafeEmpty(); + + with lock: + start_done.wait(5) + + self.assertTrue(result.has_key('command_started'), 'command started callback was not fired') + started_handle = result['command_started']['handle'] + started_status = result['command_started']['command_status'] + + self.assertEqual(started_handle.pid, started_status['pid']) + self.assertTrue(started_handle.pid > 0, "PID was not assigned to handle") + self.assertEqual(started_status['status'], ActionQueue.IN_PROGRESS_STATUS) + + complete_done.wait(2) + + finished_handle = result['command_complete']['handle'] + self.assertEqual(started_handle.pid, finished_handle.pid) + finished_status = result['command_complete']['command_status'] + self.assertEqual(finished_status['status'], ActionQueue.COMPLETED_STATUS) + self.assertEqual(finished_status['stdout'], 'process_out') + self.assertEqual(finished_status['stderr'], 'process_err') + self.assertEqual(finished_status['exitCode'], 0) + + + runningCommand = actionQueue.commandStatuses.current_state.get(self.background_command['taskId']) + self.assertTrue(runningCommand is not None) + + report = actionQueue.result() + self.assertEqual(len(report['reports']),1) + self.assertEqual(report['reports'][0]['stdout'],'process_out') +# self.assertEqual(report['reports'][0]['structuredOut'],'{"a": "b."}') + + + @patch.object(StackVersionsFileHandler, "read_stack_version") + @patch.object(FileCache, "__init__") + def test_cancel_backgound_command(self, read_stack_version_mock, FileCache_mock): + FileCache_mock.return_value = None + + dummy_controller = MagicMock() + cfg = AmbariConfig().getConfig() + cfg.set('agent', 'tolerate_download_failures', 'true') + cfg.set('agent', 'prefix', '.') + cfg.set('agent', 'cache_dir', 'background_tasks') + + actionQueue = ActionQueue(cfg, dummy_controller) + patch_output_file(actionQueue.customServiceOrchestrator.python_executor) + actionQueue.customServiceOrchestrator.python_executor.prepare_process_result = MagicMock() + actionQueue.customServiceOrchestrator.dump_command_to_json = MagicMock() + + lock = threading.RLock() + complete_done = threading.Condition(lock) + + def command_complete_w(process_condenced_result, handle): + with lock: + complete_done.wait(4) + + actionQueue.on_background_command_complete_callback = wraped(actionQueue.on_background_command_complete_callback,None, command_complete_w) + execute_command = copy.deepcopy(self.background_command) + actionQueue.put([execute_command]) + actionQueue.processBackgroundQueueSafeEmpty(); + + time.sleep(1) + + actionQueue.process_command(self.cancel_background_command) + #TODO add assert + + with lock: + complete_done.notifyAll() + + +def patch_output_file(pythonExecutor): + def windows_py(command, tmpout, tmperr): + proc = MagicMock() + proc.pid = 33 + proc.returncode = 0 + with tmpout: + tmpout.write('process_out') + with tmperr: + tmperr.write('process_err') + return proc + def open_subporcess_files_win(fout, ferr, f): + return MagicMock(), MagicMock() + def read_result_from_files(out_path, err_path, structured_out_path): + return 'process_out', 'process_err', '{"a": "b."}' + pythonExecutor.launch_python_subprocess = windows_py + pythonExecutor.open_subporcess_files = open_subporcess_files_win + pythonExecutor.read_result_from_files = read_result_from_files + +def wraped(func, before = None, after = None): + def wrapper(*args, **kwargs): + if(before is not None): + before(*args, **kwargs) + ret = func(*args, **kwargs) + if(after is not None): + after(*args, **kwargs) + return ret + return wrapper + http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py index d669cd2..a1e1c66 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py +++ b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py @@ -39,6 +39,7 @@ import sys from AgentException import AgentException from FileCache import FileCache from LiveStatus import LiveStatus +from BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle class TestCustomServiceOrchestrator(TestCase): @@ -396,6 +397,39 @@ class TestCustomServiceOrchestrator(TestCase): self.assertEqual(runCommand_mock.return_value, status) + @patch.object(CustomServiceOrchestrator, "dump_command_to_json") + @patch.object(FileCache, "__init__") + @patch.object(FileCache, "get_custom_actions_base_dir") + def test_runCommand_background_action(self, get_custom_actions_base_dir_mock, + FileCache_mock, + dump_command_to_json_mock): + FileCache_mock.return_value = None + get_custom_actions_base_dir_mock.return_value = "some path" + _, script = tempfile.mkstemp() + command = { + 'role' : 'any', + 'commandParams': { + 'script_type': 'PYTHON', + 'script': 'some_custom_action.py', + 'command_timeout': '600', + 'jdk_location' : 'some_location' + }, + 'taskId' : '13', + 'roleCommand': 'ACTIONEXECUTE', + 'commandType': 'BACKGROUND_EXECUTION_COMMAND', + '__handle' : BackgroundCommandExecutionHandle(None,13,MagicMock(), MagicMock()) + } + dummy_controller = MagicMock() + orchestrator = CustomServiceOrchestrator(self.config, dummy_controller) + + import TestActionQueue + TestActionQueue.patch_output_file(orchestrator.python_executor) + orchestrator.python_executor.condenseOutput = MagicMock() + orchestrator.dump_command_to_json = MagicMock() + + ret = orchestrator.runCommand(command, "out.txt", "err.txt") + self.assertEqual(ret['exitcode'], 777) + def tearDown(self): # enable stdout sys.stdout = sys.__stdout__ http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-server/pom.xml b/ambari-server/pom.xml index 22aa454..1914c03 100644 --- a/ambari-server/pom.xml +++ b/ambari-server/pom.xml @@ -135,6 +135,8 @@ <exclude>src/main/resources/db/serial</exclude> <exclude>src/main/resources/db/index.txt</exclude> <exclude>src/main/resources/stacks/HDP/2.1.GlusterFS/services/YARN/package/templates/exclude_hosts_list.j2</exclude> + <exclude>src/main/resources/stacks/HDP/2.0.6/services/HDFS/package/scripts/balancer-emulator/balancer-err.log</exclude> + <exclude>src/main/resources/stacks/HDP/2.0.6/services/HDFS/package/scripts/balancer-emulator/balancer.log</exclude> <exclude>conf/unix/ca.config</exclude> <exclude>conf/unix/krb5JAASLogin.conf</exclude> <exclude>**/*.json</exclude> http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java index 9e3f69c..cab891f 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java @@ -31,18 +31,13 @@ import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.ListMultimap; -import com.google.common.reflect.TypeToken; -import com.google.inject.persist.UnitOfWork; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.Role; import org.apache.ambari.server.RoleCommand; import org.apache.ambari.server.ServiceComponentHostNotFoundException; import org.apache.ambari.server.ServiceComponentNotFoundException; import org.apache.ambari.server.agent.ActionQueue; +import org.apache.ambari.server.agent.AgentCommand.AgentCommandType; import org.apache.ambari.server.agent.CancelCommand; import org.apache.ambari.server.agent.CommandReport; import org.apache.ambari.server.agent.ExecutionCommand; @@ -65,6 +60,13 @@ import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; +import com.google.common.reflect.TypeToken; +import com.google.inject.persist.UnitOfWork; + /** @@ -193,7 +195,6 @@ class ActionScheduler implements Runnable { processCancelledRequestsList(); Set<Long> runningRequestIds = new HashSet<Long>(); - Set<String> affectedHosts = new HashSet<String>(); List<Stage> stages = db.getStagesInProgress(); if (LOG.isDebugEnabled()) { LOG.debug("Scheduler wakes up"); @@ -207,6 +208,10 @@ class ActionScheduler implements Runnable { return; } int i_stage = 0; + + + stages = filterParallelPerHostStages(stages); + for (Stage s : stages) { // Check if we can process this stage in parallel with another stages i_stage ++; @@ -225,20 +230,7 @@ class ActionScheduler implements Runnable { } } - List<String> stageHosts = s.getHosts(); - boolean conflict = false; - for (String host : stageHosts) { - if (affectedHosts.contains(host)) { - conflict = true; - break; - } - } - if (conflict) { - // Also we don't want to perform stages in parallel at the same hosts - continue; - } else { - affectedHosts.addAll(stageHosts); - } + // Commands that will be scheduled in current scheduler wakeup List<ExecutionCommand> commandsToSchedule = new ArrayList<ExecutionCommand>(); @@ -354,6 +346,38 @@ class ActionScheduler implements Runnable { } } + /** + * Returns filtered list of stages following the rule: + * 1) remove stages that has the same host. Leave only first stage, the rest that have same host of any operation will be filtered + * 2) do not remove stages intersected by host if they have intersection by background command + * @param stages + * @return + */ + private List<Stage> filterParallelPerHostStages(List<Stage> stages) { + List<Stage> retVal = new ArrayList<Stage>(); + Set<String> affectedHosts = new HashSet<String>(); + for(Stage s : stages){ + for (String host : s.getHosts()) { + if (!affectedHosts.contains(host)) { + if(!isStageHasBackgroundCommandsOnly(s, host)){ + affectedHosts.add(host); + } + retVal.add(s); + } + } + } + return retVal; + } + + private boolean isStageHasBackgroundCommandsOnly(Stage s, String host) { + for (ExecutionCommandWrapper c : s.getExecutionCommands(host)) { + if(c.getExecutionCommand().getCommandType() != AgentCommandType.BACKGROUND_EXECUTION_COMMAND) + { + return false; + } + } + return true; + } /** * Executes internal ambari-server action http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentCommand.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentCommand.java index 8703320..54faf6a 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentCommand.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentCommand.java @@ -31,6 +31,7 @@ public abstract class AgentCommand { public enum AgentCommandType { EXECUTION_COMMAND, + BACKGROUND_EXECUTION_COMMAND, STATUS_COMMAND, CANCEL_COMMAND, REGISTRATION_COMMAND http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java index f2b5433..fa633c1 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java @@ -563,6 +563,7 @@ public class HeartBeatHandler { throw new AmbariException("Could not get jaxb string for command", e); } switch (ac.getCommandType()) { + case BACKGROUND_EXECUTION_COMMAND: case EXECUTION_COMMAND: { response.addExecutionCommand((ExecutionCommand) ac); break; http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java b/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java index 4f9a8a4..80af575 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java @@ -402,15 +402,7 @@ public class AmbariMetaInfo { public boolean isValidServiceComponent(String stackName, String version, String serviceName, String componentName) throws AmbariException { ServiceInfo service = getServiceInfo(stackName, version, serviceName); - if (service == null) { - return false; - } - for (ComponentInfo compInfo : service.getComponents()) { - if (compInfo.getName().equals(componentName)) { - return true; - } - } - return false; + return service != null && service.getComponentByName(componentName) != null; } /** @@ -436,17 +428,12 @@ public class AmbariMetaInfo { || services.isEmpty()) { return retService; } - boolean found = false; for (Map.Entry<String, ServiceInfo> entry : services.entrySet()) { - for (ComponentInfo compInfo : entry.getValue().getComponents()) { - if (compInfo.getName().equals(componentName)) { - retService = entry.getKey(); - found = true; - break; - } - } - if (found) + ComponentInfo vu = entry.getValue().getComponentByName(componentName); + if(vu != null){ + retService = entry.getKey(); break; + } } return retService; } http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/java/org/apache/ambari/server/api/util/StackExtensionHelper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/util/StackExtensionHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/api/util/StackExtensionHelper.java index c702a45..65efa77 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/api/util/StackExtensionHelper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/api/util/StackExtensionHelper.java @@ -254,8 +254,7 @@ public class StackExtensionHelper { for (ComponentInfo childComponent : childService.getComponents()) { if (!childComponent.isDeleted()) { - ComponentInfo parentComponent = getComponent(parentService, - childComponent.getName()); + ComponentInfo parentComponent = parentService.getComponentByName(childComponent.getName()); if (parentComponent != null) { // If parent has similar component ComponentInfo mergedComponent = mergeComponents(parentComponent, childComponent); @@ -278,17 +277,6 @@ public class StackExtensionHelper { } } - - private ComponentInfo getComponent(ServiceInfo service, String componentName) { - for (ComponentInfo component : service.getComponents()) { - if (component.getName().equals(componentName)) { - return component; - } - } - return null; - } - - ComponentInfo mergeComponents(ComponentInfo parent, ComponentInfo child) { ComponentInfo result = new ComponentInfo(child); // cloning child CommandScriptDefinition commandScript = child.getCommandScript(); http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java index cf628d9..339194f 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java @@ -42,6 +42,7 @@ import org.apache.ambari.server.Role; import org.apache.ambari.server.RoleCommand; import org.apache.ambari.server.actionmanager.HostRoleCommand; import org.apache.ambari.server.actionmanager.Stage; +import org.apache.ambari.server.agent.AgentCommand.AgentCommandType; import org.apache.ambari.server.agent.ExecutionCommand; import org.apache.ambari.server.agent.ExecutionCommand.KeyNames; import org.apache.ambari.server.api.services.AmbariMetaInfo; @@ -53,6 +54,7 @@ import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.CommandScriptDefinition; import org.apache.ambari.server.state.ComponentInfo; import org.apache.ambari.server.state.ConfigHelper; +import org.apache.ambari.server.state.CustomCommandDefinition; import org.apache.ambari.server.state.Host; import org.apache.ambari.server.state.HostComponentAdminState; import org.apache.ambari.server.state.MaintenanceState; @@ -236,7 +238,6 @@ public class AmbariCustomCommandExecutionHelper { throw new AmbariException(message); } - StackId stackId = cluster.getDesiredStackVersion(); AmbariMetaInfo ambariMetaInfo = managementController.getAmbariMetaInfo(); ServiceInfo serviceInfo = ambariMetaInfo.getServiceInfo @@ -244,6 +245,12 @@ public class AmbariCustomCommandExecutionHelper { StackInfo stackInfo = ambariMetaInfo.getStackInfo (stackId.getStackName(), stackId.getStackVersion()); + CustomCommandDefinition customCommandDefinition = null; + ComponentInfo ci = serviceInfo.getComponentByName(componentName); + if(ci != null){ + customCommandDefinition = ci.getCustomCommandByName(commandName); + } + long nowTimestamp = System.currentTimeMillis(); for (String hostName : candidateHosts) { @@ -271,6 +278,11 @@ public class AmbariCustomCommandExecutionHelper { ExecutionCommand execCmd = stage.getExecutionCommandWrapper(hostName, componentName).getExecutionCommand(); + //set type background + if(customCommandDefinition != null && customCommandDefinition.isBackground()){ + execCmd.setCommandType(AgentCommandType.BACKGROUND_EXECUTION_COMMAND); + } + execCmd.setConfigurations(configurations); execCmd.setConfigurationAttributes(configurationAttributes); execCmd.setConfigurationTags(configTags); http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/java/org/apache/ambari/server/state/ComponentInfo.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ComponentInfo.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ComponentInfo.java index f8b952c..172b1ea 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/ComponentInfo.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ComponentInfo.java @@ -156,6 +156,14 @@ public class ComponentInfo { } return false; } + public CustomCommandDefinition getCustomCommandByName(String commandName){ + for(CustomCommandDefinition ccd : getCustomCommands()){ + if (ccd.getName().equals(commandName)){ + return ccd; + } + } + return null; + } public List<DependencyInfo> getDependencies() { return dependencies; http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java index 481245c..1161cc6 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java @@ -563,7 +563,7 @@ public class ConfigHelper { ServiceInfo serviceInfo = ambariMetaInfo.getService(stackId.getStackName(), stackId.getStackVersion(), sch.getServiceName()); - ComponentInfo componentInfo = getComponentInfo(serviceInfo,sch.getServiceComponentName()); + ComponentInfo componentInfo = serviceInfo.getComponentByName(sch.getServiceComponentName()); // Configs are considered stale when: // - desired type DOES NOT exist in actual // --- desired type DOES NOT exist in stack: not_stale @@ -621,15 +621,6 @@ public class ConfigHelper { return stale; } - private ComponentInfo getComponentInfo(ServiceInfo serviceInfo, String componentName) { - for(ComponentInfo componentInfo : serviceInfo.getComponents()) { - if(componentInfo.getName().equals(componentName)){ - return componentInfo; - } - } - return null; - } - /** * @return <code>true</code> if any service on the stack defines a property * for the type. http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/java/org/apache/ambari/server/state/CustomCommandDefinition.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/CustomCommandDefinition.java b/ambari-server/src/main/java/org/apache/ambari/server/state/CustomCommandDefinition.java index a26e7be..72eeb50 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/CustomCommandDefinition.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/CustomCommandDefinition.java @@ -30,15 +30,20 @@ public class CustomCommandDefinition { private String name; private CommandScriptDefinition commandScript; + private boolean background = false; public String getName() { return name; } + + public boolean isBackground() { + return background; + } public CommandScriptDefinition getCommandScript() { return commandScript; } - + @Override public boolean equals(Object obj) { if (obj == null) { http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceInfo.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceInfo.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceInfo.java index f2d9fe3..ecc5c11 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceInfo.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceInfo.java @@ -180,7 +180,19 @@ public class ServiceInfo { if (components == null) components = new ArrayList<ComponentInfo>(); return components; } - + /** + * Finds ComponentInfo by component name + * @param componentName + * @return ComponentInfo componentName or null + */ + public ComponentInfo getComponentByName(String componentName){ + for(ComponentInfo componentInfo : getComponents()) { + if(componentInfo.getName().equals(componentName)){ + return componentInfo; + } + } + return null; + } public boolean isClientOnlyService() { if (components == null || components.isEmpty()) { return false; http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/resources/custom_action_definitions/system_action_definitions.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/custom_action_definitions/system_action_definitions.xml b/ambari-server/src/main/resources/custom_action_definitions/system_action_definitions.xml index 37ba394..b9600dd 100644 --- a/ambari-server/src/main/resources/custom_action_definitions/system_action_definitions.xml +++ b/ambari-server/src/main/resources/custom_action_definitions/system_action_definitions.xml @@ -20,16 +20,6 @@ <actionDefinitions> <actionDefinition> - <actionName>ambari_hdfs_rebalancer</actionName> - <actionType>SYSTEM</actionType> - <inputs>threshold,[principal],[keytab]</inputs> - <targetService>HDFS</targetService> - <targetComponent>NAMENODE</targetComponent> - <defaultTimeout>600</defaultTimeout> - <description>HDFS Rebalance</description> - <targetType>ANY</targetType> - </actionDefinition> - <actionDefinition> <actionName>nagios_update_ignore</actionName> <actionType>SYSTEM</actionType> <inputs>[nagios_ignore]</inputs> @@ -59,4 +49,14 @@ <description>Validate if provided service config can be applied to specified hosts</description> <targetType>ALL</targetType> </actionDefinition> + <actionDefinition> + <actionName>cancel_background_task</actionName> + <actionType>SYSTEM</actionType> + <inputs></inputs> + <targetService></targetService> + <targetComponent></targetComponent> + <defaultTimeout>60</defaultTimeout> + <description>Cancel background task</description> + <targetType>ANY</targetType> + </actionDefinition> </actionDefinitions> http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/resources/custom_actions/ambari_hdfs_rebalancer.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/custom_actions/ambari_hdfs_rebalancer.py b/ambari-server/src/main/resources/custom_actions/ambari_hdfs_rebalancer.py deleted file mode 100644 index fd1e5d1..0000000 --- a/ambari-server/src/main/resources/custom_actions/ambari_hdfs_rebalancer.py +++ /dev/null @@ -1,59 +0,0 @@ -#!/usr/bin/env python -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -Ambari Agent - -""" - -from resource_management import * - - -class HdfsRebalance(Script): - def actionexecute(self, env): - config = Script.get_config() - - hdfs_user = config['configurations']['global']['hdfs_user'] - conf_dir = "/etc/hadoop/conf" - - _authentication = config['configurations']['core-site']['hadoop.security.authentication'] - security_enabled = ( not is_empty(_authentication) and _authentication == 'kerberos') - - threshold = config['commandParams']['threshold'] - - if security_enabled: - kinit_path_local = functions.get_kinit_path( - ["/usr/bin", "/usr/kerberos/bin", "/usr/sbin"]) - principal = config['commandParams']['principal'] - keytab = config['commandParams']['keytab'] - Execute(format("{kinit_path_local} -kt {keytab} {principal}")) - - ExecuteHadoop(format('balancer -threshold {threshold}'), - user=hdfs_user, - conf_dir=conf_dir, - logoutput=True - ) - - structured_output_example = { - 'result': 'Rebalancer completed.' - } - - self.put_structured_out(structured_output_example) - - -if __name__ == "__main__": - HdfsRebalance().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/resources/custom_actions/cancel_background_task.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/custom_actions/cancel_background_task.py b/ambari-server/src/main/resources/custom_actions/cancel_background_task.py new file mode 100644 index 0000000..9f9b1ea --- /dev/null +++ b/ambari-server/src/main/resources/custom_actions/cancel_background_task.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +Ambari Agent + +""" +from resource_management import Script +from ambari_agent import shell + +class CancelBackgroundTaskCommand(Script): + def actionexecute(self, env): + config = Script.get_config() + + cancel_command_pid = config['commandParams']['cancel_command_pid'] if config['commandParams'].has_key('cancel_command_pid') else None + cancel_task_id = config['commandParams']['cancel_task_id'] + if cancel_command_pid == None: + print "Nothing to cancel: there is no any task running with given taskId = '%s'" % cancel_task_id + else: + cancel_policy = config['commandParams']['cancel_policy'] + print "Send Kill to process pid = %s for task = %s with policy %s" % (cancel_command_pid, cancel_task_id, cancel_policy) + + shell.kill_process_with_children(cancel_command_pid) + print "Process pid = %s for task = %s has been killed successfully" % (cancel_command_pid, cancel_task_id) + +if __name__ == "__main__": + CancelBackgroundTaskCommand().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/metainfo.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/metainfo.xml b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/metainfo.xml index 7ac5e34..3d30e07 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/metainfo.xml +++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/metainfo.xml @@ -42,6 +42,14 @@ <timeout>600</timeout> </commandScript> </customCommand> + <customCommand> + <name>REBALANCEHDFS</name> + <background>true</background> + <commandScript> + <script>scripts/namenode.py</script> + <scriptType>PYTHON</scriptType> + </commandScript> + </customCommand> </customCommands> </component>
