AMBARI-17374. Ambari reports "IN PROGRESS" status for a finished install task. (mpapirkovskyy)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/be17fbec Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/be17fbec Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/be17fbec Branch: refs/heads/branch-2.4 Commit: be17fbec298e33ec608b63bb4984b3ab3bbeee68 Parents: f8bb9f8 Author: Myroslav Papirkovskyi <mpapyrkovs...@hortonworks.com> Authored: Wed Jun 22 19:13:02 2016 +0300 Committer: Myroslav Papirkovskyi <mpapyrkovs...@hortonworks.com> Committed: Wed Jun 29 19:22:24 2016 +0300 ---------------------------------------------------------------------- .../src/main/python/ambari_agent/ActionQueue.py | 9 +++- .../ambari_agent/CustomServiceOrchestrator.py | 5 +- .../TestCustomServiceOrchestrator.py | 2 +- .../server/actionmanager/ActionManager.java | 7 +++ .../server/actionmanager/ActionScheduler.java | 13 +++++ .../server/actionmanager/TestActionManager.java | 56 ++++++++++++++++++++ .../actionmanager/TestActionScheduler.java | 20 ++++--- .../server/agent/HeartbeatProcessorTest.java | 4 +- 8 files changed, 104 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/be17fbec/ambari-agent/src/main/python/ambari_agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index f217a54..60c72af 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -26,6 +26,7 @@ import pprint import os import ambari_simplejson as json import time +import signal from AgentException import AgentException from LiveStatus import LiveStatus @@ -141,7 +142,7 @@ class ActionQueue(threading.Thread): logger.info("Canceling " + queued_command['commandType'] + \ " for service " + queued_command['serviceName'] + \ " and role " + queued_command['role'] + \ - " with taskId " + queued_command['taskId']) + " with taskId " + str(queued_command['taskId'])) # Kill if in progress self.customServiceOrchestrator.cancel_command(task_id, reason) @@ -313,7 +314,11 @@ class ActionQueue(threading.Thread): if commandresult['exitcode'] == 0: status = self.COMPLETED_STATUS else: - status = self.FAILED_STATUS + if (commandresult['exitcode'] == -signal.SIGTERM) or (commandresult['exitcode'] == -signal.SIGKILL): + logger.info('Command {cid} was canceled!'.format(cid=taskId)) + return + else: + status = self.FAILED_STATUS if status != self.COMPLETED_STATUS and retryAble and retryDuration > 0: delay = self.get_retry_delay(delay) http://git-wip-us.apache.org/repos/asf/ambari/blob/be17fbec/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py index fc1b72a..57416a4 100644 --- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py +++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py @@ -244,7 +244,10 @@ class CustomServiceOrchestrator(): logger.debug('Pop with taskId %s' % task_id) pid = self.commands_in_progress.pop(task_id) if not isinstance(pid, int): - return '\nCommand aborted. ' + pid + if pid: + return '\nCommand aborted. ' + pid + else: + return '' return None def requestComponentStatus(self, command): http://git-wip-us.apache.org/repos/asf/ambari/blob/be17fbec/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py index 0ff0ba5..c9724b7 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py +++ b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py @@ -440,7 +440,7 @@ class TestCustomServiceOrchestrator(TestCase): time.sleep(.1) - orchestrator.cancel_command(19,'') + orchestrator.cancel_command(19,'reason') self.assertTrue(kill_process_with_children_mock.called) kill_process_with_children_mock.assert_called_with(33) http://git-wip-us.apache.org/repos/asf/ambari/blob/be17fbec/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java index 71364c2..2b121dc 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java @@ -20,6 +20,7 @@ package org.apache.ambari.server.actionmanager; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -141,6 +142,12 @@ public class ActionManager { return; } + Collections.sort(reports, new Comparator<CommandReport>() { + @Override + public int compare(CommandReport o1, CommandReport o2) { + return (int) (o1.getTaskId()-o2.getTaskId()); + } + }); List<CommandReport> reportsToProcess = new ArrayList<CommandReport>(); Iterator<HostRoleCommand> commandIterator = commands.iterator(); //persist the action response into the db. http://git-wip-us.apache.org/repos/asf/ambari/blob/be17fbec/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java index b3aab9f..bf2ff38 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java @@ -722,6 +722,8 @@ class ActionScheduler implements Runnable { LOG.info("Removing command from queue, host={}, commandId={} ", host, c.getCommandId()); actionQueue.dequeue(host, c.getCommandId()); } else { + cancelCommandOnTimeout(Collections.singletonList(s.getHostRoleCommand(host, roleStr))); + LOG.info("Host: {}, role: {}, actionId: {} timed out and will be rescheduled", host, roleStr, s.getActionId()); @@ -1072,6 +1074,17 @@ class ActionScheduler implements Runnable { } } } + void cancelCommandOnTimeout(Collection<HostRoleCommand> hostRoleCommands) { + for (HostRoleCommand hostRoleCommand : hostRoleCommands) { + if (hostRoleCommand.getStatus() == HostRoleStatus.QUEUED || + hostRoleCommand.getStatus() == HostRoleStatus.IN_PROGRESS) { + CancelCommand cancelCommand = new CancelCommand(); + cancelCommand.setTargetTaskId(hostRoleCommand.getTaskId()); + cancelCommand.setReason(""); + actionQueue.enqueue(hostRoleCommand.getHostName(), cancelCommand); + } + } + } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/be17fbec/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java index baee0d8..f85b95d 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java @@ -134,6 +134,45 @@ public class TestActionManager { } @Test + public void testActionResponsesUnsorted() throws AmbariException { + ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class); + ActionManager am = new ActionManager(5000, 1200000, new ActionQueue(), + clusters, db, new HostsMap((String) null), unitOfWork, + injector.getInstance(RequestFactory.class), null, null); + populateActionDBWithTwoCommands(db, hostname); + Stage stage = db.getAllStages(requestId).get(0); + Assert.assertEquals(stageId, stage.getStageId()); + stage.setHostRoleStatus(hostname, "HBASE_MASTER", HostRoleStatus.QUEUED); + db.hostRoleScheduled(stage, hostname, "HBASE_MASTER"); + List<CommandReport> reports = new ArrayList<CommandReport>(); + CommandReport cr = new CommandReport(); + cr.setTaskId(2); + cr.setActionId(StageUtils.getActionId(requestId, stageId)); + cr.setRole("HBASE_REGIONSERVER"); + cr.setStatus("COMPLETED"); + cr.setStdErr("ERROR"); + cr.setStdOut("OUTPUT"); + cr.setStructuredOut("STRUCTURED_OUTPUT"); + cr.setExitCode(215); + reports.add(cr); + CommandReport cr2 = new CommandReport(); + cr2.setTaskId(1); + cr2.setActionId(StageUtils.getActionId(requestId, stageId)); + cr2.setRole("HBASE_MASTER"); + cr2.setStatus("IN_PROGRESS"); + cr2.setStdErr("ERROR"); + cr2.setStdOut("OUTPUT"); + cr2.setStructuredOut("STRUCTURED_OUTPUT"); + cr2.setExitCode(215); + reports.add(cr2); + am.processTaskResponse(hostname, reports, am.getTasks(Arrays.asList(new Long[]{1L, 2L}))); + assertEquals(HostRoleStatus.IN_PROGRESS, am.getAction(requestId, stageId) + .getHostRoleStatus(hostname, "HBASE_MASTER")); + assertEquals(HostRoleStatus.PENDING, am.getAction(requestId, stageId) + .getHostRoleStatus(hostname, "HBASE_REGIONSERVER")); + } + + @Test public void testLargeLogs() throws AmbariException { ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class); ActionManager am = new ActionManager(5000, 1200000, new ActionQueue(), @@ -189,6 +228,23 @@ public class TestActionManager { db.persistActions(request); } + private void populateActionDBWithTwoCommands(ActionDBAccessor db, String hostname) throws AmbariException { + Stage s = stageFactory.createNew(requestId, "/a/b", "cluster1", 1L, "action manager test", "clusterHostInfo", "commandParamsStage", "hostParamsStage"); + s.setStageId(stageId); + s.addHostRoleExecutionCommand(hostname, Role.HBASE_MASTER, + RoleCommand.START, + new ServiceComponentHostStartEvent(Role.HBASE_MASTER.toString(), + hostname, System.currentTimeMillis()), "cluster1", "HBASE", false, false); + s.addHostRoleExecutionCommand(hostname, Role.HBASE_REGIONSERVER, + RoleCommand.START, + new ServiceComponentHostStartEvent(Role.HBASE_REGIONSERVER.toString(), + hostname, System.currentTimeMillis()), "cluster1", "HBASE", false, false); + List<Stage> stages = new ArrayList<Stage>(); + stages.add(s); + Request request = new Request(stages, clusters); + db.persistActions(request); + } + // Test failing ... tracked by Jira BUG-4966 @Ignore @Test http://git-wip-us.apache.org/repos/asf/ambari/blob/be17fbec/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java index d92d87a..e0f67af 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java @@ -210,15 +210,18 @@ public class TestActionScheduler { scheduler.setTaskTimeoutAdjustment(false); List<AgentCommand> ac = waitForQueueSize(hostname, aq, 1, scheduler); - assertTrue(ac.get(0) instanceof ExecutionCommand); - assertEquals("1-977", ((ExecutionCommand) (ac.get(0))).getCommandId()); - assertEquals(clusterHostInfo, ((ExecutionCommand) (ac.get(0))).getClusterHostInfo()); + AgentCommand scheduledCommand = ac.get(0); + assertTrue(scheduledCommand instanceof ExecutionCommand); + assertEquals("1-977", ((ExecutionCommand) scheduledCommand).getCommandId()); + assertEquals(clusterHostInfo, ((ExecutionCommand) scheduledCommand).getClusterHostInfo()); //The action status has not changed, it should be queued again. - ac = waitForQueueSize(hostname, aq, 1, scheduler); - assertTrue(ac.get(0) instanceof ExecutionCommand); - assertEquals("1-977", ((ExecutionCommand) (ac.get(0))).getCommandId()); - assertEquals(clusterHostInfo, ((ExecutionCommand) (ac.get(0))).getClusterHostInfo()); + ac = waitForQueueSize(hostname, aq, 2, scheduler); + // first command is cancel for previous + scheduledCommand = ac.get(1); + assertTrue(scheduledCommand instanceof ExecutionCommand); + assertEquals("1-977", ((ExecutionCommand) scheduledCommand).getCommandId()); + assertEquals(clusterHostInfo, ((ExecutionCommand) scheduledCommand).getClusterHostInfo()); //Now change the action status s.setHostRoleStatus(hostname, "NAMENODE", HostRoleStatus.COMPLETED); @@ -323,6 +326,9 @@ public class TestActionScheduler { //Check that in_progress command is rescheduled assertEquals(HostRoleStatus.QUEUED, stages.get(0).getHostRoleStatus(hostname, "SECONDARY_NAMENODE")); + // Check was generated cancel command on timeout + assertFalse(aq.dequeue(hostname, AgentCommandType.CANCEL_COMMAND).isEmpty()); + //Switch command back to IN_PROGRESS status and check that other command is not rescheduled stages.get(0).setHostRoleStatus(hostname, "SECONDARY_NAMENODE", HostRoleStatus.IN_PROGRESS); scheduler.doWork(); http://git-wip-us.apache.org/repos/asf/ambari/blob/be17fbec/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatProcessorTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatProcessorTest.java b/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatProcessorTest.java index bdbb9ab..913c4ea 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatProcessorTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatProcessorTest.java @@ -1269,7 +1269,9 @@ public class HeartbeatProcessorTest { cmdReport.setRole("install_packages"); cmdReport.setClusterName(DummyCluster); - hb.setReports(Collections.singletonList(cmdReport)); + List<CommandReport> reports = new ArrayList<>(); + reports.add(cmdReport); + hb.setReports(reports); hb.setTimestamp(0L); hb.setResponseId(0); hb.setNodeStatus(new HostStatus(HostStatus.Status.HEALTHY, DummyHostStatus));