Repository: ambari Updated Branches: refs/heads/trunk 221f0e511 -> 2aee43d38
AMBARI-6837. Cancel background tasks for Rebalancer should follow the usual cancel reqeust paradigm. Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/2aee43d3 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/2aee43d3 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/2aee43d3 Branch: refs/heads/trunk Commit: 2aee43d386e36f8c4999409db02078eb36c08e04 Parents: 221f0e5 Author: Mahadev Konar <[email protected]> Authored: Tue Aug 12 14:47:24 2014 -0700 Committer: Mahadev Konar <[email protected]> Committed: Tue Aug 12 14:47:24 2014 -0700 ---------------------------------------------------------------------- .../src/main/python/ambari_agent/ActionQueue.py | 26 +++-- .../ambari_agent/CustomServiceOrchestrator.py | 76 ++++++------- .../main/python/ambari_agent/PythonExecutor.py | 2 +- .../test/python/ambari_agent/TestActionQueue.py | 73 +++---------- .../TestCustomServiceOrchestrator.py | 69 ++++++++++++ .../ambari/server/actionmanager/Stage.java | 107 +++++++++---------- .../system_action_definitions.xml | 10 -- .../custom_actions/cancel_background_task.py | 41 ------- .../services/HDFS/package/scripts/namenode.py | 15 ++- .../BackgroundCustomCommandExecutionTest.java | 51 --------- ambari-web/app/controllers/main/service/item.js | 2 +- ambari-web/app/utils/ajax/ajax.js | 18 ++-- ambari-web/app/utils/host_progress_popup.js | 7 +- 13 files changed, 203 insertions(+), 294 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-agent/src/main/python/ambari_agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index 6437036..476955f 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -55,11 +55,9 @@ class ActionQueue(threading.Thread): STATUS_COMMAND = 'STATUS_COMMAND' EXECUTION_COMMAND = 'EXECUTION_COMMAND' BACKGROUND_EXECUTION_COMMAND = 'BACKGROUND_EXECUTION_COMMAND' - CANCEL_BACKGROUND_EXECUTION_COMMAND = 'CANCEL_BACKGROUND_EXECUTION_COMMAND' ROLE_COMMAND_INSTALL = 'INSTALL' ROLE_COMMAND_START = 'START' ROLE_COMMAND_STOP = 'STOP' - ROLE_COMMAND_CANCEL = 'CANCEL' ROLE_COMMAND_CUSTOM_COMMAND = 'CUSTOM_COMMAND' CUSTOM_COMMAND_RESTART = 'RESTART' @@ -80,7 +78,7 @@ class ActionQueue(threading.Thread): self.configTags = {} self._stop = threading.Event() self.tmpdir = config.get('agent', 'prefix') - self.customServiceOrchestrator = CustomServiceOrchestrator(config, controller, self.commandStatuses) + self.customServiceOrchestrator = CustomServiceOrchestrator(config, controller) def stop(self): @@ -172,7 +170,7 @@ class ActionQueue(threading.Thread): def createCommandHandle(self, command): if(command.has_key('__handle')): raise AgentException("Command already has __handle") - command['__handle'] = BackgroundCommandExecutionHandle(command, command['commandId'], self.on_background_command_started, self.on_background_command_complete_callback) + command['__handle'] = BackgroundCommandExecutionHandle(command, command['commandId'], None, self.on_background_command_complete_callback) return command def process_command(self, command): @@ -281,20 +279,26 @@ class ActionQueue(threading.Thread): self.commandStatuses.put_command_status(command, roleResult) - def on_background_command_started(self, handle): - #update command with given handle - self.commandStatuses.update_command_status(handle.command, {'pid' : handle.pid}) - - + def command_was_canceled(self): + self.customServiceOrchestrator def on_background_command_complete_callback(self, process_condenced_result, handle): logger.debug('Start callback: %s' % process_condenced_result) logger.debug('The handle is: %s' % handle) status = self.COMPLETED_STATUS if handle.exitCode == 0 else self.FAILED_STATUS + + aborted_postfix = self.customServiceOrchestrator.command_canceled_reason(handle.command['taskId']) + if aborted_postfix: + status = self.FAILED_STATUS + logger.debug('Set status to: %s , reason = %s' % (status, aborted_postfix)) + else: + aborted_postfix = '' + + roleResult = self.commandStatuses.generate_report_template(handle.command) roleResult.update({ - 'stdout': process_condenced_result['stdout'], - 'stderr': process_condenced_result['stderr'], + 'stdout': process_condenced_result['stdout'] + aborted_postfix, + 'stderr': process_condenced_result['stderr'] + aborted_postfix, 'exitCode': process_condenced_result['exitcode'], 'structuredOut': str(json.dumps(process_condenced_result['structuredOut'])) if 'structuredOut' in process_condenced_result else '', 'status': status, http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py index 093fc22..94aa87e 100644 --- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py +++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py @@ -23,6 +23,7 @@ import os import json import sys import shell +import threading from FileCache import FileCache from AgentException import AgentException @@ -50,7 +51,7 @@ class CustomServiceOrchestrator(): PING_PORTS_KEY = "all_ping_ports" AMBARI_SERVER_HOST = "ambari_server_host" - def __init__(self, config, controller, commandStatuses = None): + def __init__(self, config, controller): self.config = config self.tmp_dir = config.get('agent', 'prefix') self.exec_tmp_dir = config.get('agent', 'tmp_dir') @@ -64,26 +65,31 @@ class CustomServiceOrchestrator(): # cache reset will be called on every agent registration controller.registration_listeners.append(self.file_cache.reset) - self.commandStatuses = commandStatuses # Clean up old status command files if any try: os.unlink(self.status_commands_stdout) os.unlink(self.status_commands_stderr) except OSError: pass # Ignore fail + self.commands_in_progress_lock = threading.RLock() self.commands_in_progress = {} def map_task_to_process(self, task_id, processId): - self.commands_in_progress[task_id] = processId + with self.commands_in_progress_lock: + logger.debug('Maps taskId=%s to pid=%s'%(task_id, processId)) + self.commands_in_progress[task_id] = processId def cancel_command(self, task_id, reason): - if task_id in self.commands_in_progress.keys(): - pid = self.commands_in_progress.get(task_id) - self.commands_in_progress[task_id] = reason - logger.info("Canceling command with task_id - {tid}, " \ - "reason - {reason} . Killing process {pid}" - .format(tid = str(task_id), reason = reason, pid = pid)) - shell.kill_process_with_children(pid) + with self.commands_in_progress_lock: + if task_id in self.commands_in_progress.keys(): + pid = self.commands_in_progress.get(task_id) + self.commands_in_progress[task_id] = reason + logger.info("Canceling command with task_id - {tid}, " \ + "reason - {reason} . Killing process {pid}" + .format(tid = str(task_id), reason = reason, pid = pid)) + shell.kill_process_with_children(pid) + else: + logger.warn("Unable to find pid by taskId = %s"%task_id) def runCommand(self, command, tmpoutfile, tmperrfile, forced_command_name = None, override_output_files = True): @@ -95,7 +101,6 @@ class CustomServiceOrchestrator(): script_type = command['commandParams']['script_type'] script = command['commandParams']['script'] timeout = int(command['commandParams']['command_timeout']) - before_interceptor_method = command['commandParams']['before_system_hook_function'] if command['commandParams'].has_key('before_system_hook_function') else None if 'hostLevelParams' in command and 'jdk_location' in command['hostLevelParams']: server_url_prefix = command['hostLevelParams']['jdk_location'] @@ -114,12 +119,6 @@ class CustomServiceOrchestrator(): if command_name == self.CUSTOM_ACTION_COMMAND: base_dir = self.file_cache.get_custom_actions_base_dir(server_url_prefix) script_tuple = (os.path.join(base_dir, script) , base_dir) - - # Call systemHook functions in current virtual machine. This function can enrich custom action - # command with some information from current machine. And can be considered as plugin - if before_interceptor_method != None: - self.processSystemHookFunctions(script_tuple, before_interceptor_method, command) - hook_dir = None else: if command_name == self.CUSTOM_COMMAND_COMMAND: @@ -140,6 +139,7 @@ class CustomServiceOrchestrator(): handle = None if(command.has_key('__handle')): handle = command['__handle'] + handle.on_background_command_started = self.map_task_to_process del command['__handle'] json_path = self.dump_command_to_json(command) @@ -155,7 +155,6 @@ class CustomServiceOrchestrator(): # Executing hooks and script ret = None - from ActionQueue import ActionQueue if(command.has_key('commandType') and command['commandType'] == ActionQueue.BACKGROUND_EXECUTION_COMMAND and len(filtered_py_file_list) > 1): raise AgentException("Background commands are supported without hooks only") @@ -174,18 +173,17 @@ class CustomServiceOrchestrator(): if not ret: # Something went wrong raise AgentException("No script has been executed") - # if canceled - if self.commands_in_progress.has_key(task_id):#Background command do not push in this collection (TODO) - pid = self.commands_in_progress.pop(task_id) - if not isinstance(pid, int): - reason = '\nCommand aborted. ' + pid - ret['stdout'] += reason - ret['stderr'] += reason + # if canceled and not background command + if handle is None: + cancel_reason = self.command_canceled_reason(task_id) + if cancel_reason: + ret['stdout'] += cancel_reason + ret['stderr'] += cancel_reason with open(tmpoutfile, "a") as f: - f.write(reason) + f.write(cancel_reason) with open(tmperrfile, "a") as f: - f.write(reason) + f.write(cancel_reason) except Exception: # We do not want to let agent fail completely exc_type, exc_obj, exc_tb = sys.exc_info() @@ -199,21 +197,15 @@ class CustomServiceOrchestrator(): 'exitcode': 1, } return ret - - def fetch_bg_pid_by_taskid(self,command): - cancel_command_pid = None - try: - cancelTaskId = int(command['commandParams']['cancel_task_id']) - status = self.commandStatuses.get_command_status(cancelTaskId) - cancel_command_pid = status['pid'] - except Exception: - pass - logger.info("Found PID=%s for cancel taskId=%s" % (cancel_command_pid,cancelTaskId)) - command['commandParams']['cancel_command_pid'] = cancel_command_pid - - def processSystemHookFunctions(self, script_tuple, before_interceptor_method, command): - getattr(self, before_interceptor_method)(command) - + def command_canceled_reason(self, task_id): + with self.commands_in_progress_lock: + if self.commands_in_progress.has_key(task_id):#Background command do not push in this collection (TODO) + logger.debug('Pop with taskId %s' % task_id) + pid = self.commands_in_progress.pop(task_id) + if not isinstance(pid, int): + return '\nCommand aborted. ' + pid + return None + def requestComponentStatus(self, command): """ Component status is determined by exit code, returned by runCommand(). http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py index d130497..874b70b 100644 --- a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py +++ b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py @@ -196,7 +196,7 @@ class BackgroundThread(threading.Thread): self.holder.handle.pid = process.pid self.holder.handle.status = BackgroundCommandExecutionHandle.RUNNING_STATUS - self.holder.handle.on_background_command_started(self.holder.handle) + self.holder.handle.on_background_command_started(self.holder.handle.command['taskId'], process.pid) process.communicate() http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py index 4447670..f582a68 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py +++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py @@ -618,12 +618,6 @@ class TestActionQueue(TestCase): result = {} lock = threading.RLock() complete_done = threading.Condition(lock) - start_done = threading.Condition(lock) - - def command_started_w(handle): - with lock: - result['command_started'] = {'handle': copy.copy(handle), 'command_status' : actionQueue.commandStatuses.get_command_status(handle.command['taskId'])} - start_done.notifyAll() def command_complete_w(process_condenced_result, handle): with lock: @@ -634,26 +628,13 @@ class TestActionQueue(TestCase): complete_done.notifyAll() actionQueue.on_background_command_complete_callback = wraped(actionQueue.on_background_command_complete_callback,None, command_complete_w) - actionQueue.on_background_command_started = wraped(actionQueue.on_background_command_started,None,command_started_w) actionQueue.put([self.background_command]) actionQueue.processBackgroundQueueSafeEmpty(); actionQueue.processStatusCommandQueueSafeEmpty(); with lock: - start_done.wait(5) - - self.assertTrue(result.has_key('command_started'), 'command started callback was not fired') - started_handle = result['command_started']['handle'] - started_status = result['command_started']['command_status'] - - self.assertEqual(started_handle.pid, started_status['pid']) - self.assertTrue(started_handle.pid > 0, "PID was not assigned to handle") - self.assertEqual(started_status['status'], ActionQueue.IN_PROGRESS_STATUS) - - complete_done.wait(2) + complete_done.wait(.1) - finished_handle = result['command_complete']['handle'] - self.assertEqual(started_handle.pid, finished_handle.pid) finished_status = result['command_complete']['command_status'] self.assertEqual(finished_status['status'], ActionQueue.COMPLETED_STATUS) self.assertEqual(finished_status['stdout'], 'process_out') @@ -667,46 +648,24 @@ class TestActionQueue(TestCase): report = actionQueue.result() self.assertEqual(len(report['reports']),1) self.assertEqual(report['reports'][0]['stdout'],'process_out') -# self.assertEqual(report['reports'][0]['structuredOut'],'{"a": "b."}') - - - @patch.object(StackVersionsFileHandler, "read_stack_version") - @patch.object(FileCache, "__init__") - def test_cancel_backgound_command(self, read_stack_version_mock, FileCache_mock): - FileCache_mock.return_value = None +# self.assertEqual(report['reports'][0]['structuredOut'],'{"a": "b."}') - dummy_controller = MagicMock() - cfg = AmbariConfig().getConfig() - cfg.set('agent', 'tolerate_download_failures', 'true') - cfg.set('agent', 'prefix', '.') - cfg.set('agent', 'cache_dir', 'background_tasks') - actionQueue = ActionQueue(cfg, dummy_controller) - patch_output_file(actionQueue.customServiceOrchestrator.python_executor) - actionQueue.customServiceOrchestrator.python_executor.prepare_process_result = MagicMock() - actionQueue.customServiceOrchestrator.dump_command_to_json = MagicMock() + + cancel_background_command = { + "commandType":"CANCEL_COMMAND", + "role":"AMBARI_SERVER_ACTION", + "roleCommand":"ABORT", + "commandId":"2--1", + "taskId":20, + "clusterName":"c1", + "serviceName":"", + "hostname":"c6401", + "roleParams":{ + "cancelTaskIdTargets":"13,14" + }, + } - lock = threading.RLock() - complete_done = threading.Condition(lock) - - def command_complete_w(process_condenced_result, handle): - with lock: - complete_done.wait(4) - - actionQueue.on_background_command_complete_callback = wraped(actionQueue.on_background_command_complete_callback,None, command_complete_w) - execute_command = copy.deepcopy(self.background_command) - actionQueue.put([execute_command]) - actionQueue.processBackgroundQueueSafeEmpty(); - - time.sleep(1) - - actionQueue.process_command(self.cancel_background_command) - #TODO add assert - - with lock: - complete_done.notifyAll() - - def patch_output_file(pythonExecutor): def windows_py(command, tmpout, tmperr): proc = MagicMock() http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py index a1e1c66..92791e2 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py +++ b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py @@ -40,6 +40,7 @@ from AgentException import AgentException from FileCache import FileCache from LiveStatus import LiveStatus from BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle +from ambari_agent.ActionQueue import ActionQueue class TestCustomServiceOrchestrator(TestCase): @@ -304,6 +305,74 @@ class TestCustomServiceOrchestrator(TestCase): self.assertTrue(os.path.exists(err)) os.remove(out) os.remove(err) + + from ambari_agent.StackVersionsFileHandler import StackVersionsFileHandler + + @patch("shell.kill_process_with_children") + @patch.object(FileCache, "__init__") + @patch.object(CustomServiceOrchestrator, "resolve_script_path") + @patch.object(CustomServiceOrchestrator, "resolve_hook_script_path") + @patch.object(StackVersionsFileHandler, "read_stack_version") + def test_cancel_backgound_command(self, read_stack_version_mock, resolve_hook_script_path_mock, resolve_script_path_mock, FileCache_mock, + kill_process_with_children_mock): + FileCache_mock.return_value = None + FileCache_mock.cache_dir = MagicMock() + resolve_hook_script_path_mock.return_value = None +# shell.kill_process_with_children = MagicMock() + dummy_controller = MagicMock() + cfg = AmbariConfig().getConfig() + cfg.set('agent', 'tolerate_download_failures', 'true') + cfg.set('agent', 'prefix', '.') + cfg.set('agent', 'cache_dir', 'background_tasks') + + actionQueue = ActionQueue(cfg, dummy_controller) + + dummy_controller.actionQueue = actionQueue + orchestrator = CustomServiceOrchestrator(cfg, dummy_controller) + orchestrator.file_cache = MagicMock() + def f (a, b): + return "" + orchestrator.file_cache.get_service_base_dir = f + actionQueue.customServiceOrchestrator = orchestrator + + import TestActionQueue + import copy + + TestActionQueue.patch_output_file(orchestrator.python_executor) + orchestrator.python_executor.prepare_process_result = MagicMock() + orchestrator.dump_command_to_json = MagicMock() + + lock = threading.RLock() + complete_done = threading.Condition(lock) + + complete_was_called = {} + def command_complete_w(process_condenced_result, handle): + with lock: + complete_was_called['visited']= '' + complete_done.wait(3) + + actionQueue.on_background_command_complete_callback = TestActionQueue.wraped(actionQueue.on_background_command_complete_callback, command_complete_w, None) + execute_command = copy.deepcopy(TestActionQueue.TestActionQueue.background_command) + actionQueue.put([execute_command]) + actionQueue.processBackgroundQueueSafeEmpty() + + time.sleep(.1) + + orchestrator.cancel_command(19,'') + self.assertTrue(kill_process_with_children_mock.called) + kill_process_with_children_mock.assert_called_with(33) + + with lock: + complete_done.notifyAll() + + with lock: + self.assertTrue(complete_was_called.has_key('visited')) + + time.sleep(.1) + + runningCommand = actionQueue.commandStatuses.get_command_status(19) + self.assertTrue(runningCommand is not None) + self.assertEqual(runningCommand['status'], ActionQueue.FAILED_STATUS) @patch.object(CustomServiceOrchestrator, "dump_command_to_json") http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java index 83d46eb..c4bbb46 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java @@ -30,6 +30,7 @@ import javax.annotation.Nullable; import org.apache.ambari.server.Role; import org.apache.ambari.server.RoleCommand; +import org.apache.ambari.server.agent.AgentCommand.AgentCommandType; import org.apache.ambari.server.agent.ExecutionCommand; import org.apache.ambari.server.orm.dao.HostRoleCommandDAO; import org.apache.ambari.server.orm.entities.HostRoleCommandEntity; @@ -38,10 +39,13 @@ import org.apache.ambari.server.orm.entities.StageEntity; import org.apache.ambari.server.serveraction.ServerAction; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.ServiceComponentHostEvent; +import org.apache.ambari.server.state.fsm.event.Event; import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent; import org.apache.ambari.server.utils.StageUtils; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.util.Assert; import com.google.inject.assistedinject.Assisted; import com.google.inject.assistedinject.AssistedInject; @@ -216,51 +220,58 @@ public class Stage { public String getActionId() { return StageUtils.getActionId(requestId, getStageId()); } - - /** - * A new host role command is created for execution. - * Creates both ExecutionCommand and HostRoleCommand objects and - * adds them to the Stage. This should be called only once for a host-role - * for a given stage. - */ - public synchronized void addHostRoleExecutionCommand(String host, Role role, RoleCommand command, - ServiceComponentHostEvent event, String clusterName, String serviceName) { + + private synchronized ExecutionCommandWrapper addGenericExecutionCommand(String clusterName, String hostName, Role role, RoleCommand command, ServiceComponentHostEvent event){ //used on stage creation only, no need to check if wrappers loaded - HostRoleCommand hrc = new HostRoleCommand(host, role, event, command); + HostRoleCommand hrc = new HostRoleCommand(hostName, role, event, command); ExecutionCommand cmd = new ExecutionCommand(); ExecutionCommandWrapper wrapper = new ExecutionCommandWrapper(cmd); hrc.setExecutionCommandWrapper(wrapper); - cmd.setHostname(host); + cmd.setHostname(hostName); cmd.setClusterName(clusterName); - cmd.setServiceName(serviceName); cmd.setCommandId(this.getActionId()); cmd.setRole(role.name()); cmd.setRoleCommand(command); - Map<String, HostRoleCommand> hrcMap = this.hostRoleCommands.get(host); + cmd.setServiceName(""); + + Map<String, HostRoleCommand> hrcMap = this.hostRoleCommands.get(hostName); if (hrcMap == null) { hrcMap = new LinkedHashMap<String, HostRoleCommand>(); - this.hostRoleCommands.put(host, hrcMap); + this.hostRoleCommands.put(hostName, hrcMap); } if (hrcMap.get(role.toString()) != null) { throw new RuntimeException( "Setting the host role command second time for same stage: stage=" - + this.getActionId() + ", host=" + host + ", role=" + role); + + this.getActionId() + ", host=" + hostName + ", role=" + role); } hrcMap.put(role.toString(), hrc); - List<ExecutionCommandWrapper> execCmdList = this.commandsToSend.get(host); + List<ExecutionCommandWrapper> execCmdList = this.commandsToSend.get(hostName); if (execCmdList == null) { execCmdList = new ArrayList<ExecutionCommandWrapper>(); - this.commandsToSend.put(host, execCmdList); + this.commandsToSend.put(hostName, execCmdList); } - + if (execCmdList.contains(wrapper)) { //todo: proper exception throw new RuntimeException( "Setting the execution command second time for same stage: stage=" - + this.getActionId() + ", host=" + host + ", role=" + role); + + this.getActionId() + ", host=" + hostName + ", role=" + role+ ", event="+event); } execCmdList.add(wrapper); + return wrapper; + } + /** + * A new host role command is created for execution. + * Creates both ExecutionCommand and HostRoleCommand objects and + * adds them to the Stage. This should be called only once for a host-role + * for a given stage. + */ + public synchronized void addHostRoleExecutionCommand(String host, Role role, RoleCommand command, + ServiceComponentHostEvent event, String clusterName, String serviceName) { + ExecutionCommandWrapper commandWrapper = addGenericExecutionCommand(clusterName, host, role, command, event); + + commandWrapper.getExecutionCommand().setServiceName(serviceName); } @@ -268,51 +279,33 @@ public class Stage { * Creates server-side execution command. As of now, it seems to * be used only for server upgrade */ - public synchronized void addServerActionCommand( - String actionName, Role role, RoleCommand command, String clusterName, + public synchronized void addServerActionCommand(String actionName, Role role, RoleCommand command, String clusterName, ServiceComponentHostUpgradeEvent event, String hostName) { - //used on stage creation only, no need to check if wrappers loaded - HostRoleCommand hrc = new HostRoleCommand(hostName, role, event, command); - ExecutionCommand cmd = new ExecutionCommand(); - ExecutionCommandWrapper wrapper = new ExecutionCommandWrapper(cmd); - hrc.setExecutionCommandWrapper(wrapper); - cmd.setHostname(hostName); - cmd.setClusterName(clusterName); - cmd.setServiceName(""); - cmd.setCommandId(this.getActionId()); - cmd.setRole(role.name()); - cmd.setRoleCommand(command); - + ExecutionCommandWrapper commandWrapper = addGenericExecutionCommand(clusterName, hostName, role, command, event); + ExecutionCommand cmd = commandWrapper.getExecutionCommand(); + Map<String, String> roleParams = new HashMap<String, String>(); roleParams.put(ServerAction.ACTION_NAME, actionName); cmd.setRoleParams(roleParams); - Map<String, HostRoleCommand> hrcMap = this.hostRoleCommands.get(hostName); - if (hrcMap == null) { - hrcMap = new LinkedHashMap<String, HostRoleCommand>(); - this.hostRoleCommands.put(hostName, hrcMap); - } - if (hrcMap.get(role.toString()) != null) { - throw new RuntimeException( - "Setting the server action the second time for same stage: stage=" - + this.getActionId() + ", action=" + actionName); - } - hrcMap.put(role.toString(), hrc); - List<ExecutionCommandWrapper> execCmdList = this.commandsToSend.get(hostName); - if (execCmdList == null) { - execCmdList = new ArrayList<ExecutionCommandWrapper>(); - this.commandsToSend.put(hostName, execCmdList); - } - - if (execCmdList.contains(wrapper)) { - //todo: proper exception - throw new RuntimeException( - "Setting the execution command second time for same stage: stage=" - + this.getActionId() + ", action=" + actionName); - } - execCmdList.add(wrapper); } /** + * Adds cancel command to stage for given cancelTargets collection of task id's that has to be canceled in Agent layer. + */ + public synchronized void addCancelRequestCommand(List<Long> cancelTargets, String clusterName, String hostName) { + ExecutionCommandWrapper commandWrapper = addGenericExecutionCommand(clusterName, hostName, Role.AMBARI_SERVER_ACTION, RoleCommand.ABORT, null); + ExecutionCommand cmd = commandWrapper.getExecutionCommand(); + cmd.setCommandType(AgentCommandType.CANCEL_COMMAND); + + Assert.notEmpty(cancelTargets, "Provided targets task Id are empty."); + + Map<String, String> roleParams = new HashMap<String, String>(); + + roleParams.put("cancelTaskIdTargets", StringUtils.join(cancelTargets, ',')); + cmd.setRoleParams(roleParams); + } + + /** * * @return list of hosts */ http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-server/src/main/resources/custom_action_definitions/system_action_definitions.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/custom_action_definitions/system_action_definitions.xml b/ambari-server/src/main/resources/custom_action_definitions/system_action_definitions.xml index b9600dd..c65a496 100644 --- a/ambari-server/src/main/resources/custom_action_definitions/system_action_definitions.xml +++ b/ambari-server/src/main/resources/custom_action_definitions/system_action_definitions.xml @@ -49,14 +49,4 @@ <description>Validate if provided service config can be applied to specified hosts</description> <targetType>ALL</targetType> </actionDefinition> - <actionDefinition> - <actionName>cancel_background_task</actionName> - <actionType>SYSTEM</actionType> - <inputs></inputs> - <targetService></targetService> - <targetComponent></targetComponent> - <defaultTimeout>60</defaultTimeout> - <description>Cancel background task</description> - <targetType>ANY</targetType> - </actionDefinition> </actionDefinitions> http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-server/src/main/resources/custom_actions/cancel_background_task.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/custom_actions/cancel_background_task.py b/ambari-server/src/main/resources/custom_actions/cancel_background_task.py deleted file mode 100644 index 9f9b1ea..0000000 --- a/ambari-server/src/main/resources/custom_actions/cancel_background_task.py +++ /dev/null @@ -1,41 +0,0 @@ -#!/usr/bin/env python -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -Ambari Agent - -""" -from resource_management import Script -from ambari_agent import shell - -class CancelBackgroundTaskCommand(Script): - def actionexecute(self, env): - config = Script.get_config() - - cancel_command_pid = config['commandParams']['cancel_command_pid'] if config['commandParams'].has_key('cancel_command_pid') else None - cancel_task_id = config['commandParams']['cancel_task_id'] - if cancel_command_pid == None: - print "Nothing to cancel: there is no any task running with given taskId = '%s'" % cancel_task_id - else: - cancel_policy = config['commandParams']['cancel_policy'] - print "Send Kill to process pid = %s for task = %s with policy %s" % (cancel_command_pid, cancel_task_id, cancel_policy) - - shell.kill_process_with_children(cancel_command_pid) - print "Process pid = %s for task = %s has been killed successfully" % (cancel_command_pid, cancel_task_id) - -if __name__ == "__main__": - CancelBackgroundTaskCommand().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/package/scripts/namenode.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/package/scripts/namenode.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/package/scripts/namenode.py index 3b320d1..f401122 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/package/scripts/namenode.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/package/scripts/namenode.py @@ -74,14 +74,15 @@ class NameNode(Script): namenode(action="decommission") pass + def rebalancehdfs(self, env): import params env.set_params(params) name_node_parameters = json.loads( params.name_node_params ) threshold = name_node_parameters['threshold'] - print "Starting balancer with threshold = %s" % threshold - + _print("Starting balancer with threshold = %s\n" % threshold) + def calculateCompletePercent(first, current): return 1.0 - current.bytesLeftToMove/first.bytesLeftToMove @@ -97,7 +98,7 @@ class NameNode(Script): basedir = os.path.join(env.config.basedir, 'scripts', 'balancer-emulator') command = ['python','hdfs-command.py'] - print "Executing command %s" % command + _print("Executing command %s\n" % command) parser = hdfs_rebalance.HdfsParser() proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, @@ -106,7 +107,7 @@ class NameNode(Script): cwd=basedir ) for line in iter(proc.stdout.readline, ''): - sys.stdout.write('[balancer] %s %s' % (str(datetime.now()), line )) + _print('[balancer] %s %s' % (str(datetime.now()), line )) pl = parser.parseLine(line) if pl: res = pl.toJson() @@ -114,10 +115,14 @@ class NameNode(Script): self.put_structured_out(res) elif parser.state == 'PROCESS_FINISED' : - sys.stdout.write('[balancer] %s %s' % (str(datetime.now()), 'Process is finished' )) + _print('[balancer] %s %s' % (str(datetime.now()), 'Process is finished' )) self.put_structured_out({'completePercent' : 1}) break +def _print(line): + sys.stdout.write(line) + sys.stdout.flush() + if __name__ == "__main__": NameNode().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-server/src/test/java/org/apache/ambari/server/controller/BackgroundCustomCommandExecutionTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/BackgroundCustomCommandExecutionTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/BackgroundCustomCommandExecutionTest.java index d49cadd..76b9fbc 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/BackgroundCustomCommandExecutionTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/BackgroundCustomCommandExecutionTest.java @@ -146,57 +146,6 @@ public class BackgroundCustomCommandExecutionTest { Assert.fail(e.getMessage()); } } - @SuppressWarnings("serial") - @Test - public void testCancelCommand() { - try { - createClusterFixture(); - - Map<String, String> requestProperties = new HashMap<String, String>() { - { - put(REQUEST_CONTEXT_PROPERTY, "Stop background command"); -// put("cancel_policy","SIGKILL"); -// put("cancel_task_id","19"); - } - }; - - ExecuteActionRequest actionRequest = new ExecuteActionRequest( - "c1", - "actionexecute","cancel_background_task", - null, - null, - new HashMap<String, String>(){{ - put("cancel_policy","SIGKILL"); // parameters/cancel_policy -- in request params - put("cancel_task_id","19"); - }}); - actionRequest.getResourceFilters().add(new RequestResourceFilter("HDFS", "NAMENODE", Collections.singletonList("c6401"))); - - controller.createAction(actionRequest, requestProperties); - - Mockito.verify(am, Mockito.times(1)).sendActions(stagesCaptor.capture(), any(ExecuteActionRequest.class)); - - List<Stage> stages = stagesCaptor.getValue(); - Assert.assertEquals(1, stages.size()); - Stage stage = stages.get(0); - - Assert.assertEquals(1, stage.getHosts().size()); - - List<ExecutionCommandWrapper> commands = stage.getExecutionCommands("c6401"); - Assert.assertEquals(1, commands.size()); - - ExecutionCommand command = commands.get(0).getExecutionCommand(); - - Assert.assertEquals(AgentCommandType.EXECUTION_COMMAND, command.getCommandType()); - Assert.assertEquals("ACTIONEXECUTE", command.getRoleCommand().name()); - Assert.assertEquals("cancel_background_task.py", command.getCommandParams().get("script")); - Assert.assertEquals("SIGKILL", command.getCommandParams().get("cancel_policy")); - Assert.assertEquals("19", command.getCommandParams().get("cancel_task_id")); - - - } catch (AmbariException e) { - Assert.fail(e.getMessage()); - } - } private void createClusterFixture() throws AmbariException { createCluster("c1"); http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-web/app/controllers/main/service/item.js ---------------------------------------------------------------------- diff --git a/ambari-web/app/controllers/main/service/item.js b/ambari-web/app/controllers/main/service/item.js index f05940e..5593c72 100644 --- a/ambari-web/app/controllers/main/service/item.js +++ b/ambari-web/app/controllers/main/service/item.js @@ -280,7 +280,7 @@ App.MainServiceItemController = Em.Controller.extend({ errorMessage: Em.I18n.t('services.service.actions.run.rebalanceHdfsNodes.promptError'), isInvalid: function () { var intValue = Number(this.get('inputValue')); - return isNaN(intValue) || intValue < 1 || intValue > 100; + return this.get('inputValue')!=='DEBUG' && (isNaN(intValue) || intValue < 1 || intValue > 100); }.property('inputValue'), disablePrimary : function() { return this.get('isInvalid'); http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-web/app/utils/ajax/ajax.js ---------------------------------------------------------------------- diff --git a/ambari-web/app/utils/ajax/ajax.js b/ambari-web/app/utils/ajax/ajax.js index 1cb7846..0de25d4 100644 --- a/ambari-web/app/utils/ajax/ajax.js +++ b/ambari-web/app/utils/ajax/ajax.js @@ -340,26 +340,20 @@ var urls = { }, 'cancel.background.operation' : { - 'real' : '/clusters/{clusterName}/requests', + 'real' : '/clusters/{clusterName}/requests/{requestId}', 'mock' : '', 'format' : function(data) { return { - type : 'POST', + type : 'PUT', data : JSON.stringify({ RequestInfo : { - 'context' : 'Cancel background operation', - 'action' : 'cancel_background_task', + 'context' : 'Cancel operation', "parameters" : { - "cancel_policy" : "SIGKILL", - 'before_system_hook_function' : 'fetch_bg_pid_by_taskid', - "cancel_task_id" : data.cancelTaskId + "cancel_policy" : "SIGKILL" } }, - "Requests/resource_filters" : [ { - "service_name" : data.serviceName, - "component_name" : data.componentName, - 'hosts' : data.hosts - } ] + "Requests/request_status":'ABORTED', + "Requests/abort_reason": "Cancel background operation" }) } } http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-web/app/utils/host_progress_popup.js ---------------------------------------------------------------------- diff --git a/ambari-web/app/utils/host_progress_popup.js b/ambari-web/app/utils/host_progress_popup.js index bcd3563..0da534d 100644 --- a/ambari-web/app/utils/host_progress_popup.js +++ b/ambari-web/app/utils/host_progress_popup.js @@ -958,12 +958,7 @@ App.HostPopup = Em.Object.create({ name : 'cancel.background.operation', sender : hostPopup, data : { - cancelTaskId : hostPopup.get('openedTaskId'), - command : "REFRESHQUEUES", - context : Em.I18n.t('services.service.actions.run.yarnRefreshQueues.context') , - hosts : App.Service.find('HDFS').get('hostComponents').findProperty('componentName', 'NAMENODE').get('hostName'), - serviceName : "HDFS", - componentName : "NAMENODE" + requestId : hostPopup.get('controller.currentServiceId') } }); hostPopup.backToServiceList();
