AMBARI-19930. The service check status was set to TIMEOUT even if service check was failed. (mpapirkovskyy)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/c9bea4ab Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/c9bea4ab Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/c9bea4ab Branch: refs/heads/branch-feature-AMBARI-12556 Commit: c9bea4ab8f8a042e60103e8bb8880fc718fa3cf3 Parents: fc9788a Author: Myroslav Papirkovskyi <[email protected]> Authored: Thu Feb 9 20:11:26 2017 +0200 Committer: Myroslav Papirkovskyi <[email protected]> Committed: Fri Feb 10 16:00:21 2017 +0200 ---------------------------------------------------------------------- .../server/actionmanager/ActionScheduler.java | 38 +++++++++++--- .../actionmanager/TestActionScheduler.java | 54 ++++++++------------ 2 files changed, 53 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/c9bea4ab/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java index dabcb98..fa2ad4f 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java @@ -845,6 +845,28 @@ class ActionScheduler implements Runnable { commandsToSchedule.add(c); LOG.trace("===> commandsToSchedule(reschedule)=" + commandsToSchedule.size()); } + } else if (isHostStateUnknown(s, hostObj, roleStr)) { + String message = "Action was aborted due agent is not heartbeating or was restarted."; + LOG.warn("Host: {}, role: {}, actionId: {} . {}", host, roleStr, + s.getActionId(), message); + + db.abortHostRole(host, s.getRequestId(), s.getStageId(), c.getRole(), message); + + if (null != cluster) { + if (!RoleCommand.CUSTOM_COMMAND.equals(c.getRoleCommand()) + && !RoleCommand.SERVICE_CHECK.equals(c.getRoleCommand()) + && !RoleCommand.ACTIONEXECUTE.equals(c.getRoleCommand())) { + //commands above don't affect host component state (e.g. no in_progress state in process), transition will fail + transitionToFailedState(cluster.getClusterName(), c.getServiceName(), roleStr, host, now, false); + } + if (c.getRoleCommand().equals(RoleCommand.ACTIONEXECUTE)) { + processActionDeath(cluster.getClusterName(), c.getHostname(), roleStr); + } + } + + // Dequeue command + LOG.info("Removing command from queue, host={}, commandId={} ", host, c.getCommandId()); + actionQueue.dequeue(host, c.getCommandId()); } else if (status.equals(HostRoleStatus.PENDING)) { // in case of DEPENDENCY_ORDERED stage command can be scheduled only if all of it's dependencies are // already finished @@ -1030,13 +1052,6 @@ class ActionScheduler implements Runnable { return false; } - // Fast fail task if host state is unknown - if (null != host && - (host.getState().equals(HostState.HEARTBEAT_LOST) || wasAgentRestartedDuringOperation(host, stage, role))) { - LOG.debug("Timing out action since agent is not heartbeating or agent was restarted."); - return true; - } - // tasks are held in a variety of in-memory maps that require a hostname key // host being null is ok - that means it's a server-side task String hostName = (null == host) ? null : host.getHostName(); @@ -1053,6 +1068,15 @@ class ActionScheduler implements Runnable { return false; } + private boolean isHostStateUnknown(Stage stage, Host host, String role) { + if (null != host && + (host.getState().equals(HostState.HEARTBEAT_LOST) || wasAgentRestartedDuringOperation(host, stage, role))) { + LOG.debug("Abort action since agent is not heartbeating or agent was restarted."); + return true; + } + return false; + } + private boolean hasCommandInProgress(Stage stage, String host) { List<ExecutionCommandWrapper> commandWrappers = stage.getExecutionCommands(host); for (ExecutionCommandWrapper wrapper : commandWrappers) { http://git-wip-us.apache.org/repos/asf/ambari/blob/c9bea4ab/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java index ade625a..653ad2c 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java @@ -391,7 +391,7 @@ public class TestActionScheduler { when(host.getState()).thenReturn(HostState.HEARTBEAT_LOST); when(host.getHostName()).thenReturn(hostname); - List<Stage> stages = new ArrayList<Stage>(); + final List<Stage> stages = new ArrayList<Stage>(); final Stage s = StageUtils.getATestStage(1, 977, hostname, CLUSTER_HOST_INFO, "{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}"); stages.add(s); @@ -404,16 +404,26 @@ public class TestActionScheduler { when(db.getCommandsInProgressCount()).thenReturn(stages.size()); when(db.getStagesInProgress()).thenReturn(stages); + doAnswer(new Answer<Void>() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { - String host = (String) invocation.getArguments()[0]; - String role = (String) invocation.getArguments()[3]; - HostRoleCommand command = s.getHostRoleCommand(host, role); - command.setStatus(HostRoleStatus.TIMEDOUT); + Long requestId = (Long) invocation.getArguments()[1]; + for (Stage stage : stages) { + if (requestId.equals(stage.getRequestId())) { + for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) { + if (command.getStatus() == HostRoleStatus.QUEUED || + command.getStatus() == HostRoleStatus.IN_PROGRESS || + command.getStatus() == HostRoleStatus.PENDING) { + command.setStatus(HostRoleStatus.ABORTED); + } + } + } + } + return null; } - }).when(db).timeoutHostRole(anyString(), anyLong(), anyLong(), anyString(), anyBoolean()); + }).when(db).abortHostRole(anyString(), anyLong(), anyLong(), anyString(), anyString()); //Small action timeout to test rescheduling AmbariEventPublisher aep = EasyMock.createNiceMock(AmbariEventPublisher.class); @@ -423,18 +433,16 @@ public class TestActionScheduler { mock(HostRoleCommandDAO.class), mock(HostRoleCommandFactory.class)). addMockedMethod("cancelHostRoleCommands"). createMock(); - scheduler.cancelHostRoleCommands(EasyMock.<Collection<HostRoleCommand>>anyObject(),EasyMock.anyObject(String.class)); - EasyMock.expectLastCall(); EasyMock.replay(scheduler); scheduler.setTaskTimeoutAdjustment(false); int cycleCount=0; while (!stages.get(0).getHostRoleStatus(hostname, "NAMENODE") - .equals(HostRoleStatus.TIMEDOUT) && cycleCount++ <= MAX_CYCLE_ITERATIONS) { + .equals(HostRoleStatus.ABORTED) && cycleCount++ <= MAX_CYCLE_ITERATIONS) { scheduler.doWork(); } - Assert.assertEquals(HostRoleStatus.TIMEDOUT,stages.get(0).getHostRoleStatus(hostname, "NAMENODE")); + Assert.assertEquals(HostRoleStatus.ABORTED,stages.get(0).getHostRoleStatus(hostname, "NAMENODE")); EasyMock.verify(scheduler, entityManagerProviderMock); } @@ -503,23 +511,7 @@ public class TestActionScheduler { doAnswer(new Answer<Void>() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { - String host = (String) invocation.getArguments()[0]; - String role = (String) invocation.getArguments()[3]; - //HostRoleCommand command = stages.get(0).getHostRoleCommand(host, role); - for (HostRoleCommand command : stages.get(0).getOrderedHostRoleCommands()) { - if (command.getHostName().equals(host) && command.getRole().name() - .equals(role)) { - command.setStatus(HostRoleStatus.TIMEDOUT); - } - } - return null; - } - }).when(db).timeoutHostRole(anyString(), anyLong(), anyLong(), anyString(), anyBoolean()); - - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - Long requestId = (Long) invocation.getArguments()[0]; + Long requestId = (Long) invocation.getArguments()[1]; for (Stage stage : stages) { if (requestId.equals(stage.getRequestId())) { for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) { @@ -534,7 +526,7 @@ public class TestActionScheduler { return null; } - }).when(db).abortOperation(anyLong()); + }).when(db).abortHostRole(anyString(), anyLong(), anyLong(), anyString(), anyString()); ArgumentCaptor<ServiceComponentHostEvent> eventsCapture1 = ArgumentCaptor.forClass(ServiceComponentHostEvent.class); @@ -549,12 +541,12 @@ public class TestActionScheduler { int cycleCount=0; while (!(stages.get(0).getHostRoleStatus(hostname1, "DATANODE") - .equals(HostRoleStatus.TIMEDOUT) && stages.get(0).getHostRoleStatus + .equals(HostRoleStatus.ABORTED) && stages.get(0).getHostRoleStatus (hostname2, "NAMENODE").equals(HostRoleStatus.ABORTED)) && cycleCount++ <= MAX_CYCLE_ITERATIONS) { scheduler.doWork(); } - Assert.assertEquals(HostRoleStatus.TIMEDOUT, + Assert.assertEquals(HostRoleStatus.ABORTED, stages.get(0).getHostRoleStatus(hostname1, "DATANODE")); Assert.assertEquals(HostRoleStatus.ABORTED, stages.get(0).getHostRoleStatus(hostname2, "NAMENODE")); @@ -910,9 +902,7 @@ public class TestActionScheduler { EasyMock.expect(fsm.getCluster(EasyMock.anyString())).andReturn(cluster).anyTimes(); EasyMock.expect(fsm.getHost(EasyMock.anyString())).andReturn(host); EasyMock.expect(cluster.getService(EasyMock.anyString())).andReturn(null); - EasyMock.expect(host.getLastRegistrationTime()).andReturn(HOST_REGISTRATION_TIME); EasyMock.expect(host.getHostName()).andReturn(Stage.INTERNAL_HOSTNAME).anyTimes(); - EasyMock.expect(host.getState()).andReturn(HostState.HEALTHY); if (RoleCommand.ACTIONEXECUTE.equals(roleCommand)) { EasyMock.expect(cluster.getClusterName()).andReturn("clusterName").anyTimes();
