Repository: ambari Updated Branches: refs/heads/trunk 382e7f3df -> 9bbb43e57
AMBARI-6828. Postfixes for server-side implementation of Cancel requests (dlysnichenko) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/9bbb43e5 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/9bbb43e5 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/9bbb43e5 Branch: refs/heads/trunk Commit: 9bbb43e57bb6071fabcb23c12ffb222fb33cccdb Parents: 382e7f3 Author: Lisnichenko Dmitro <dlysniche...@hortonworks.com> Authored: Fri Aug 8 15:13:24 2014 +0300 Committer: Lisnichenko Dmitro <dlysniche...@hortonworks.com> Committed: Tue Aug 12 19:13:38 2014 +0300 ---------------------------------------------------------------------- .../actionmanager/ActionDBAccessorImpl.java | 5 + .../server/actionmanager/ActionScheduler.java | 2 +- .../actionmanager/TestActionDBAccessorImpl.java | 33 +- .../actionmanager/TestActionScheduler.java | 193 +++++++++-- .../server/agent/TestHeartbeatHandler.java | 323 +++++++++++++++---- 5 files changed, 464 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/9bbb43e5/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java index dae9048..5e879cc 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java @@ -346,6 +346,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { long now = System.currentTimeMillis(); List<Long> requestsToCheck = new ArrayList<Long>(); + List<Long> abortedCommandUpdates = new ArrayList<Long>(); List<HostRoleCommandEntity> commandEntities = hostRoleCommandDAO.findByPKs(taskReports.keySet()); for (HostRoleCommandEntity commandEntity : commandEntities) { @@ -354,6 +355,8 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { // We don't want to overwrite statuses for ABORTED tasks with // statuses that have been received from the agent after aborting task commandEntity.setStatus(HostRoleStatus.valueOf(report.getStatus())); + } else { + abortedCommandUpdates.add(commandEntity.getTaskId()); } commandEntity.setStdOut(report.getStdOut().getBytes()); commandEntity.setStdError(report.getStdErr().getBytes()); @@ -375,6 +378,8 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { } hostRoleCommandDAO.mergeAll(commandEntities); + // Invalidate cache because of updates to ABORTED commands + hostRoleCommandCache.invalidateAll(abortedCommandUpdates); for (Long requestId : requestsToCheck) { endRequestIfCompleted(requestId); http://git-wip-us.apache.org/repos/asf/ambari/blob/9bbb43e5/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java index cab891f..b9a67b7 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java @@ -794,7 +794,7 @@ class ActionScheduler implements Runnable { * @param hostRoleCommands a list of hostRoleCommands * @param reason why the request is being cancelled */ - private void cancelHostRoleCommands(Collection<HostRoleCommand> hostRoleCommands, String reason) { + void cancelHostRoleCommands(Collection<HostRoleCommand> hostRoleCommands, String reason) { for (HostRoleCommand hostRoleCommand : hostRoleCommands) { if (hostRoleCommand.getStatus() == HostRoleStatus.QUEUED) { // Dequeue all tasks that have been already scheduled for sending to agent http://git-wip-us.apache.org/repos/asf/ambari/blob/9bbb43e5/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java index a94f421..2850897 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java @@ -120,6 +120,37 @@ public class TestActionDBAccessorImpl { Stage s = db.getAllStages(requestId).get(0); assertEquals(HostRoleStatus.COMPLETED,s.getHostRoleStatus(hostname, "HBASE_MASTER")); } + + @Test + public void testCancelCommandReport() throws AmbariException { + String hostname = "host1"; + populateActionDB(db, hostname, requestId, stageId); + Stage stage = db.getAllStages(requestId).get(0); + Assert.assertEquals(stageId, stage.getStageId()); + stage.setHostRoleStatus(hostname, "HBASE_MASTER", HostRoleStatus.ABORTED); + db.hostRoleScheduled(stage, hostname, "HBASE_MASTER"); + List<CommandReport> reports = new ArrayList<CommandReport>(); + CommandReport cr = new CommandReport(); + cr.setTaskId(1); + cr.setActionId(StageUtils.getActionId(requestId, stageId)); + cr.setRole("HBASE_MASTER"); + cr.setStatus("COMPLETED"); + cr.setStdErr(""); + cr.setStdOut(""); + cr.setExitCode(0); + reports.add(cr); + am.processTaskResponse(hostname, reports, stage.getOrderedHostRoleCommands()); + assertEquals(0, + am.getAction(requestId, stageId).getExitCode(hostname, "HBASE_MASTER")); + assertEquals("HostRoleStatus should remain ABORTED " + + "(command report status should be ignored)", + HostRoleStatus.ABORTED, am.getAction(requestId, stageId) + .getHostRoleStatus(hostname, "HBASE_MASTER")); + Stage s = db.getAllStages(requestId).get(0); + assertEquals("HostRoleStatus should remain ABORTED " + + "(command report status should be ignored)", + HostRoleStatus.ABORTED,s.getHostRoleStatus(hostname, "HBASE_MASTER")); + } @Test public void testGetStagesInProgress() throws AmbariException { @@ -129,8 +160,6 @@ public class TestActionDBAccessorImpl { stages.add(createStubStage(hostname, requestId, stageId + 1)); Request request = new Request(stages, clusters); db.persistActions(request); - - List<Stage> stages2 = db.getStagesInProgress(); assertEquals(2, stages.size()); } http://git-wip-us.apache.org/repos/asf/ambari/blob/9bbb43e5/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java index 60ed592..a536bef 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java @@ -26,6 +26,7 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.*; import java.lang.reflect.Type; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -44,13 +45,14 @@ import org.apache.ambari.server.ServiceComponentHostNotFoundException; import org.apache.ambari.server.actionmanager.ActionScheduler.RoleStats; import org.apache.ambari.server.agent.ActionQueue; import org.apache.ambari.server.agent.AgentCommand; +import org.apache.ambari.server.agent.AgentCommand.AgentCommandType; +import org.apache.ambari.server.agent.CancelCommand; import org.apache.ambari.server.agent.CommandReport; import org.apache.ambari.server.agent.ExecutionCommand; -import org.apache.ambari.server.agent.AgentCommand.AgentCommandType; import org.apache.ambari.server.configuration.Configuration; -import org.apache.ambari.server.controller.AmbariCustomCommandExecutionHelper; import org.apache.ambari.server.controller.HostsMap; import org.apache.ambari.server.serveraction.ServerAction; +import org.apache.ambari.server.serveraction.ServerActionManager; import org.apache.ambari.server.serveraction.ServerActionManagerImpl; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; @@ -60,11 +62,12 @@ import org.apache.ambari.server.state.Service; import org.apache.ambari.server.state.ServiceComponent; import org.apache.ambari.server.state.ServiceComponentHost; import org.apache.ambari.server.state.ServiceComponentHostEvent; -import org.apache.ambari.server.state.ServiceComponentHostEventType; import org.apache.ambari.server.state.svccomphost.ServiceComponentHostInstallEvent; import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpFailedEvent; import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent; import org.apache.ambari.server.utils.StageUtils; +import org.easymock.Capture; +import org.easymock.EasyMock; import org.junit.Ignore; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -82,6 +85,7 @@ public class TestActionScheduler { private static final String CLUSTER_HOST_INFO_UPDATED = "{all_hosts=[c6401.ambari.apache.org," + " c6402.ambari.apache.org], slave_hosts=[c6401.ambari.apache.org," + " c6402.ambari.apache.org]}"; + private final String hostname = "ahost.ambari.apache.org"; /** @@ -111,7 +115,6 @@ public class TestActionScheduler { when(scomp.getServiceComponentHost(anyString())).thenReturn(sch); when(serviceObj.getCluster()).thenReturn(oneClusterMock); - String hostname = "ahost.ambari.apache.org"; Host host = mock(Host.class); HashMap<String, ServiceComponentHost> hosts = new HashMap<String, ServiceComponentHost>(); @@ -193,7 +196,6 @@ public class TestActionScheduler { when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp); when(scomp.getServiceComponentHost(anyString())).thenReturn(sch); when(serviceObj.getCluster()).thenReturn(oneClusterMock); - String hostname = "ahost.ambari.apache.org"; Host host = mock(Host.class); HashMap<String, ServiceComponentHost> hosts = new HashMap<String, ServiceComponentHost>(); @@ -258,7 +260,6 @@ public class TestActionScheduler { when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp); when(scomp.getServiceComponentHost(anyString())).thenReturn(sch); when(serviceObj.getCluster()).thenReturn(oneClusterMock); - String hostname = "ahost.ambari.apache.org"; Host host = mock(Host.class); HashMap<String, ServiceComponentHost> hosts = new HashMap<String, ServiceComponentHost>(); @@ -286,10 +287,15 @@ public class TestActionScheduler { } }).when(db).timeoutHostRole(anyString(), anyLong(), anyLong(), anyString()); + ServerActionManager sam = EasyMock.createNiceMock(ServerActionManager.class); //Small action timeout to test rescheduling - ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, - new HostsMap((String) null), null, unitOfWork, conf); + ActionScheduler scheduler = EasyMock.createMockBuilder(ActionScheduler.class). + withConstructor((long) 100, (long) 50, db, aq, fsm, 3, + new HostsMap((String) null), sam, unitOfWork, conf). + addMockedMethod("cancelHostRoleCommands"). + createMock(); + EasyMock.replay(scheduler); scheduler.setTaskTimeoutAdjustment(false); // Start the thread scheduler.start(); @@ -298,10 +304,11 @@ public class TestActionScheduler { .equals(HostRoleStatus.TIMEDOUT)) { Thread.sleep(100L); } -// assertEquals(stages.get(0).getHostRoleStatus(hostname, "NAMENODE"), -// HostRoleStatus.TIMEDOUT); scheduler.stop(); + + //Check that cancelHostRoleCommands() was not called + EasyMock.verify(scheduler); } @Test @@ -465,7 +472,6 @@ public class TestActionScheduler { when(scomp.getServiceComponentHost(anyString())).thenReturn(sch); when(serviceObj.getCluster()).thenReturn(oneClusterMock); - String hostname = "ahost.ambari.apache.org"; HashMap<String, ServiceComponentHost> hosts = new HashMap<String, ServiceComponentHost>(); hosts.put(hostname, sch); @@ -527,7 +533,6 @@ public class TestActionScheduler { when(scomp.getServiceComponentHost(anyString())).thenReturn(sch); when(serviceObj.getCluster()).thenReturn(oneClusterMock); - String hostname = "ahost.ambari.apache.org"; HashMap<String, ServiceComponentHost> hosts = new HashMap<String, ServiceComponentHost>(); hosts.put(hostname, sch); @@ -830,7 +835,6 @@ public class TestActionScheduler { when(scomp.getServiceComponentHost(anyString())).thenReturn(sch); when(serviceObj.getCluster()).thenReturn(oneClusterMock); - String hostname = "ahost.ambari.apache.org"; HashMap<String, ServiceComponentHost> hosts = new HashMap<String, ServiceComponentHost>(); hosts.put(hostname, sch); @@ -915,11 +919,22 @@ public class TestActionScheduler { Properties properties = new Properties(); Configuration conf = new Configuration(properties); - ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, - new HostsMap((String) null), new ServerActionManagerImpl(fsm), - unitOfWork, conf); + ServerActionManagerImpl serverActionManager = new ServerActionManagerImpl(fsm); + + Capture<Collection<HostRoleCommand>> cancelCommandList = new Capture<Collection<HostRoleCommand>>(); + ActionScheduler scheduler = EasyMock.createMockBuilder(ActionScheduler.class). + withConstructor((long)100, (long)50, db, aq, fsm, 3, + new HostsMap((String) null), serverActionManager, + unitOfWork, conf). + addMockedMethod("cancelHostRoleCommands"). + createMock(); + scheduler.cancelHostRoleCommands(EasyMock.capture(cancelCommandList), + EasyMock.eq(ActionScheduler.FAILED_TASK_ABORT_REASONING)); + EasyMock.expectLastCall().once(); + EasyMock.replay(scheduler); + ActionManager am = new ActionManager( - 2, 2, aq, fsm, db, new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork, requestFactory, conf); + 2, 2, aq, fsm, db, new HostsMap((String) null), serverActionManager, unitOfWork, requestFactory, conf); scheduler.doWork(); @@ -930,6 +945,8 @@ public class TestActionScheduler { scheduler.doWork(); Assert.assertEquals(HostRoleStatus.FAILED, stages.get(0).getHostRoleStatus(hostname, "NAMENODE")); Assert.assertEquals(HostRoleStatus.ABORTED, stages.get(1).getHostRoleStatus(hostname, "DATANODE")); + Assert.assertEquals(cancelCommandList.getValue().size(), 1); + EasyMock.verify(scheduler); } /** @@ -1398,7 +1415,6 @@ public class TestActionScheduler { when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp); when(scomp.getServiceComponentHost(anyString())).thenReturn(sch); when(serviceObj.getCluster()).thenReturn(oneClusterMock); - String hostname = "ahost.ambari.apache.org"; Host host = mock(Host.class); HashMap<String, ServiceComponentHost> hosts = new HashMap<String, ServiceComponentHost>(); @@ -1556,7 +1572,6 @@ public class TestActionScheduler { when(scomp.getServiceComponentHost(anyString())).thenReturn(sch); when(serviceObj.getCluster()).thenReturn(oneClusterMock); - String hostname = "ahost.ambari.apache.org"; HashMap<String, ServiceComponentHost> hosts = new HashMap<String, ServiceComponentHost>(); hosts.put(hostname, sch); @@ -1598,8 +1613,148 @@ public class TestActionScheduler { scheduler.stop(); assertEquals(stages.get(0).getHostRoleStatus(hostname, "AMBARI_SERVER_ACTION"), HostRoleStatus.COMPLETED); + } + + @Test + public void testCancelRequests() throws Exception { + ActionQueue aq = new ActionQueue(); + Clusters fsm = mock(Clusters.class); + Cluster oneClusterMock = mock(Cluster.class); + Service serviceObj = mock(Service.class); + ServiceComponent scomp = mock(ServiceComponent.class); + ServiceComponentHost sch = mock(ServiceComponentHost.class); + UnitOfWork unitOfWork = mock(UnitOfWork.class); + RequestFactory requestFactory = mock(RequestFactory.class); + when(fsm.getCluster(anyString())).thenReturn(oneClusterMock); + when(oneClusterMock.getService(anyString())).thenReturn(serviceObj); + when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp); + when(scomp.getServiceComponentHost(anyString())).thenReturn(sch); + when(serviceObj.getCluster()).thenReturn(oneClusterMock); + + HashMap<String, ServiceComponentHost> hosts = + new HashMap<String, ServiceComponentHost>(); + hosts.put(hostname, sch); + when(scomp.getServiceComponentHosts()).thenReturn(hosts); + + long requestId = 1; + + final List<Stage> stages = new ArrayList<Stage>(); + int namenodeCmdTaskId = 1; + stages.add( + getStageWithSingleTask( + hostname, "cluster1", Role.NAMENODE, RoleCommand.START, + Service.Type.HDFS, namenodeCmdTaskId, 1, (int)requestId)); + stages.add( + getStageWithSingleTask( + hostname, "cluster1", Role.DATANODE, RoleCommand.START, + Service.Type.HDFS, 2, 2, (int)requestId)); + + Host host = mock(Host.class); + when(fsm.getHost(anyString())).thenReturn(host); + when(host.getState()).thenReturn(HostState.HEALTHY); + when(host.getHostName()).thenReturn(hostname); + + ActionDBAccessor db = mock(ActionDBAccessor.class); + + when(db.getStagesInProgress()).thenReturn(stages); + + List<HostRoleCommand> requestTasks = new ArrayList<HostRoleCommand>(); + for (Stage stage : stages) { + requestTasks.addAll(stage.getOrderedHostRoleCommands()); + } + when(db.getRequestTasks(anyLong())).thenReturn(requestTasks); + when(db.getAllStages(anyLong())).thenReturn(stages); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + List<CommandReport> reports = (List<CommandReport>) invocation.getArguments()[0]; + for (CommandReport report : reports) { + String actionId = report.getActionId(); + long[] requestStageIds = StageUtils.getRequestStage(actionId); + Long requestId = requestStageIds[0]; + Long stageId = requestStageIds[1]; + String role = report.getRole(); + Long id = report.getTaskId(); + for (Stage stage : stages) { + if (requestId.equals(stage.getRequestId()) && stageId.equals(stage.getStageId())) { + for (HostRoleCommand hostRoleCommand : stage.getOrderedHostRoleCommands()) { + if (hostRoleCommand.getTaskId() == id) { + hostRoleCommand.setStatus(HostRoleStatus.valueOf(report.getStatus())); + } + } + } + } + } + + return null; + } + }).when(db).updateHostRoleStates(anyCollectionOf(CommandReport.class)); + + when(db.getTask(anyLong())).thenAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + Long taskId = (Long) invocation.getArguments()[0]; + for (Stage stage : stages) { + for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) { + if (taskId.equals(command.getTaskId())) { + return command; + } + } + } + return null; + } + }); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + Long requestId = (Long) invocation.getArguments()[0]; + for (Stage stage : stages) { + if (requestId.equals(stage.getRequestId())) { + for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) { + if (command.getStatus() == HostRoleStatus.QUEUED || + command.getStatus() == HostRoleStatus.IN_PROGRESS || + command.getStatus() == HostRoleStatus.PENDING) { + command.setStatus(HostRoleStatus.ABORTED); + } + } + } + } + + return null; + } + }).when(db).abortOperation(anyLong()); + + Properties properties = new Properties(); + Configuration conf = new Configuration(properties); + ServerActionManagerImpl serverActionManager = new ServerActionManagerImpl(fsm); + + ActionScheduler scheduler =new ActionScheduler(100, 50, db, aq, fsm, 3, + new HostsMap((String) null), serverActionManager, unitOfWork, conf); + + ActionManager am = new ActionManager( + 2, 2, aq, fsm, db, new HostsMap((String) null), + serverActionManager, unitOfWork, requestFactory, conf); + + scheduler.doWork(); + + + + //List<CommandReport> reports = new ArrayList<CommandReport>(); + //reports.add(getCommandReport(HostRoleStatus.FAILED, Role.NAMENODE, Service.Type.HDFS, "1-1", 1)); + //am.processTaskResponse(hostname, reports, stages.get(0).getOrderedHostRoleCommands()); + String reason = "Some reason"; + + scheduler.scheduleCancellingRequest(requestId, reason); + + scheduler.doWork(); + Assert.assertEquals(HostRoleStatus.ABORTED, stages.get(0).getHostRoleStatus(hostname, "NAMENODE")); + Assert.assertEquals(HostRoleStatus.ABORTED, stages.get(1).getHostRoleStatus(hostname, "DATANODE")); + Assert.assertEquals(aq.size(hostname), 1); // Cancel commands should be generated only for 1 stage + CancelCommand cancelCommand = (CancelCommand) aq.dequeue(hostname); + Assert.assertEquals(cancelCommand.getTargetTaskId(), namenodeCmdTaskId); + Assert.assertEquals(cancelCommand.getReason(), reason); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/9bbb43e5/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java index 349f09d..5c4a4f1 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java @@ -36,6 +36,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyList; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -60,6 +61,7 @@ import org.apache.ambari.server.RoleCommand; import org.apache.ambari.server.actionmanager.ActionDBAccessor; import org.apache.ambari.server.actionmanager.ActionDBAccessorImpl; import org.apache.ambari.server.actionmanager.ActionManager; +import org.apache.ambari.server.actionmanager.HostRoleCommand; import org.apache.ambari.server.actionmanager.HostRoleStatus; import org.apache.ambari.server.actionmanager.Request; import org.apache.ambari.server.actionmanager.RequestFactory; @@ -70,6 +72,7 @@ import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.HostsMap; import org.apache.ambari.server.orm.GuiceJpaInitializer; import org.apache.ambari.server.orm.InMemoryDefaultTestModule; +import org.apache.ambari.server.serveraction.ServerActionManager; import org.apache.ambari.server.state.Alert; import org.apache.ambari.server.state.AlertState; import org.apache.ambari.server.state.Cluster; @@ -88,9 +91,9 @@ import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStartEvent import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent; import org.apache.ambari.server.utils.StageUtils; import org.codehaus.jackson.JsonGenerationException; +import static org.easymock.EasyMock.*; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -136,8 +139,11 @@ public class TestHeartbeatHandler { } @Test + @SuppressWarnings("unchecked") public void testHeartbeat() throws Exception { ActionManager am = getMockActionManager(); + expect(am.getTasks(anyObject(List.class))).andReturn(new ArrayList<HostRoleCommand>()); + replay(am); Clusters fsm = clusters; fsm.addHost(DummyHostname1); Host hostObject = clusters.getHost(DummyHostname1); @@ -190,10 +196,8 @@ public class TestHeartbeatHandler { } @Test - @Ignore //TODO (dlysnichenko) : fix + @SuppressWarnings("unchecked") public void testHeartbeatWithConfigs() throws Exception { - ActionManager am = getMockActionManager(); - Cluster cluster = getDummyCluster(); @SuppressWarnings("serial") @@ -211,8 +215,7 @@ public class TestHeartbeatHandler { hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist(); ActionQueue aq = new ActionQueue(); - HeartBeatHandler handler = getHeartBeatHandler(am, aq); - + ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS). getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1); ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS). @@ -220,7 +223,6 @@ public class TestHeartbeatHandler { serviceComponentHost1.setState(State.INSTALLED); serviceComponentHost2.setState(State.INSTALLED); - HeartBeat hb = new HeartBeat(); hb.setResponseId(0); hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus)); @@ -245,7 +247,18 @@ public class TestHeartbeatHandler { reports.add(cr); hb.setReports(reports); - + + final HostRoleCommand command = new HostRoleCommand(DummyHostname1, + Role.DATANODE, null, null); + + ActionManager am = getMockActionManager(); + expect(am.getTasks(anyObject(List.class))).andReturn( + new ArrayList<HostRoleCommand>() {{ + add(command); + }}); + replay(am); + + HeartBeatHandler handler = getHeartBeatHandler(am, aq); handler.handleHeartBeat(hb); // the heartbeat test passed if actual configs is populated @@ -254,10 +267,8 @@ public class TestHeartbeatHandler { } @Test - @Ignore //TODO (dlysnichenko) : fix + @SuppressWarnings("unchecked") public void testHeartbeatCustomCommandWithConfigs() throws Exception { - ActionManager am = getMockActionManager(); - Cluster cluster = getDummyCluster(); @SuppressWarnings("serial") @@ -275,7 +286,6 @@ public class TestHeartbeatHandler { hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist(); ActionQueue aq = new ActionQueue(); - HeartBeatHandler handler = getHeartBeatHandler(am, aq); ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS). getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1); @@ -284,7 +294,6 @@ public class TestHeartbeatHandler { serviceComponentHost1.setState(State.INSTALLED); serviceComponentHost2.setState(State.INSTALLED); - HeartBeat hb = new HeartBeat(); hb.setResponseId(0); hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus)); @@ -326,6 +335,18 @@ public class TestHeartbeatHandler { reports.add(crn); hb.setReports(reports); + final HostRoleCommand command = new HostRoleCommand(DummyHostname1, + Role.DATANODE, null, null); + + ActionManager am = getMockActionManager(); + expect(am.getTasks(anyObject(List.class))).andReturn( + new ArrayList<HostRoleCommand>() {{ + add(command); + add(command); + }}); + replay(am); + + HeartBeatHandler handler = getHeartBeatHandler(am, aq); handler.handleHeartBeat(hb); // the heartbeat test passed if actual configs is populated @@ -336,10 +357,8 @@ public class TestHeartbeatHandler { } @Test - @Ignore //TODO (dlysnichenko) : fix + @SuppressWarnings("unchecked") public void testHeartbeatCustomStartStop() throws Exception { - ActionManager am = getMockActionManager(); - Cluster cluster = getDummyCluster(); @SuppressWarnings("serial") @@ -357,7 +376,6 @@ public class TestHeartbeatHandler { hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist(); ActionQueue aq = new ActionQueue(); - HeartBeatHandler handler = getHeartBeatHandler(am, aq); ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS). getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1); @@ -405,6 +423,18 @@ public class TestHeartbeatHandler { assertTrue(serviceComponentHost1.isRestartRequired()); + final HostRoleCommand command = new HostRoleCommand(DummyHostname1, + Role.DATANODE, null, null); + + ActionManager am = getMockActionManager(); + expect(am.getTasks(anyObject(List.class))).andReturn( + new ArrayList<HostRoleCommand>() {{ + add(command); + add(command); + }}); + replay(am); + + HeartBeatHandler handler = getHeartBeatHandler(am, aq); handler.handleHeartBeat(hb); // the heartbeat test passed if actual configs is populated @@ -417,9 +447,8 @@ public class TestHeartbeatHandler { } @Test + @SuppressWarnings("unchecked") public void testStatusHeartbeat() throws Exception { - ActionManager am = getMockActionManager(); - Cluster cluster = getDummyCluster(); @SuppressWarnings("serial") @@ -437,7 +466,6 @@ public class TestHeartbeatHandler { hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist(); ActionQueue aq = new ActionQueue(); - HeartBeatHandler handler = getHeartBeatHandler(am, aq); ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS). getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1); @@ -472,6 +500,18 @@ public class TestHeartbeatHandler { componentStatuses.add(componentStatus2); hb.setComponentStatus(componentStatuses); + final HostRoleCommand command = new HostRoleCommand(DummyHostname1, + Role.DATANODE, null, null); + + ActionManager am = getMockActionManager(); + expect(am.getTasks(anyObject(List.class))).andReturn( + new ArrayList<HostRoleCommand>() {{ + add(command); + add(command); + }}); + replay(am); + + HeartBeatHandler handler = getHeartBeatHandler(am, aq); handler.handleHeartBeat(hb); State componentState1 = serviceComponentHost1.getState(); State componentState2 = serviceComponentHost2.getState(); @@ -482,9 +522,8 @@ public class TestHeartbeatHandler { } @Test + @SuppressWarnings("unchecked") public void testStatusHeartbeatWithAnnotation() throws Exception { - ActionManager am = getMockActionManager(); - Cluster cluster = getDummyCluster(); @SuppressWarnings("serial") @@ -499,7 +538,6 @@ public class TestHeartbeatHandler { hdfs.addServiceComponent(SECONDARY_NAMENODE).persist(); ActionQueue aq = new ActionQueue(); - HeartBeatHandler handler = getHeartBeatHandler(am, aq); HeartBeat hb = new HeartBeat(); hb.setTimestamp(System.currentTimeMillis()); @@ -510,6 +548,17 @@ public class TestHeartbeatHandler { ArrayList<ComponentStatus> componentStatuses = new ArrayList<ComponentStatus>(); hb.setComponentStatus(componentStatuses); + final HostRoleCommand command = new HostRoleCommand(DummyHostname1, + Role.DATANODE, null, null); + + ActionManager am = getMockActionManager(); + expect(am.getTasks(anyObject(List.class))).andReturn( + new ArrayList<HostRoleCommand>() {{ + add(command); + }}).anyTimes(); + replay(am); + + HeartBeatHandler handler = getHeartBeatHandler(am, aq); HeartBeatResponse resp = handler.handleHeartBeat(hb); Assert.assertFalse(resp.hasMappedComponents()); @@ -531,8 +580,8 @@ public class TestHeartbeatHandler { } @Test + @SuppressWarnings("unchecked") public void testLiveStatusUpdateAfterStopFailed() throws Exception { - ActionManager am = getMockActionManager(); Cluster cluster = getDummyCluster(); @SuppressWarnings("serial") @@ -550,7 +599,6 @@ public class TestHeartbeatHandler { addServiceComponentHost(DummyHostname1).persist(); ActionQueue aq = new ActionQueue(); - HeartBeatHandler handler = getHeartBeatHandler(am, aq); ServiceComponentHost serviceComponentHost1 = clusters. getCluster(DummyCluster).getService(HDFS). @@ -589,6 +637,18 @@ public class TestHeartbeatHandler { hb.setComponentStatus(componentStatuses); + final HostRoleCommand command = new HostRoleCommand(DummyHostname1, + Role.DATANODE, null, null); + + ActionManager am = getMockActionManager(); + expect(am.getTasks(anyObject(List.class))).andReturn( + new ArrayList<HostRoleCommand>() {{ + add(command); + add(command); + }}); + replay(am); + + HeartBeatHandler handler = getHeartBeatHandler(am, aq); handler.handleHeartBeat(hb); State componentState1 = serviceComponentHost1.getState(); State componentState2 = serviceComponentHost2.getState(); @@ -656,6 +716,7 @@ public class TestHeartbeatHandler { public void testRegistration() throws AmbariException, InvalidStateTransitionException { ActionManager am = getMockActionManager(); + replay(am); Clusters fsm = clusters; HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am, injector); @@ -687,6 +748,7 @@ public class TestHeartbeatHandler { InvalidStateTransitionException { ActionManager am = getMockActionManager(); + replay(am); Clusters fsm = clusters; HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am, injector); @@ -727,6 +789,7 @@ public class TestHeartbeatHandler { @Test public void testRegistrationPublicHostname() throws AmbariException, InvalidStateTransitionException { ActionManager am = getMockActionManager(); + replay(am); Clusters fsm = clusters; HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am, injector); @@ -759,6 +822,7 @@ public class TestHeartbeatHandler { public void testInvalidOSRegistration() throws AmbariException, InvalidStateTransitionException { ActionManager am = getMockActionManager(); + replay(am); Clusters fsm = clusters; HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am, injector); @@ -787,6 +851,7 @@ public class TestHeartbeatHandler { InvalidStateTransitionException { ActionManager am = getMockActionManager(); + replay(am); Clusters fsm = clusters; HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am, injector); @@ -814,6 +879,7 @@ public class TestHeartbeatHandler { public void testRegisterNewNode() throws AmbariException, InvalidStateTransitionException { ActionManager am = getMockActionManager(); + replay(am); Clusters fsm = clusters; fsm.addHost(DummyHostname1); Host hostObject = clusters.getHost(DummyHostname1); @@ -906,6 +972,7 @@ public class TestHeartbeatHandler { when(hm.generateStatusCommands(anyString())).thenReturn(dummyCmds); ActionManager am = getMockActionManager(); + replay(am); Clusters fsm = clusters; ActionQueue actionQueue = new ActionQueue(); HeartBeatHandler handler = new HeartBeatHandler(fsm, actionQueue, am, @@ -930,9 +997,8 @@ public class TestHeartbeatHandler { } @Test - @Ignore //TODO (dlysnichenko) : fix + @SuppressWarnings("unchecked") public void testTaskInProgressHandling() throws AmbariException, InvalidStateTransitionException { - ActionManager am = getMockActionManager(); Cluster cluster = getDummyCluster(); @SuppressWarnings("serial") @@ -950,7 +1016,6 @@ public class TestHeartbeatHandler { hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist(); ActionQueue aq = new ActionQueue(); - HeartBeatHandler handler = getHeartBeatHandler(am, aq); ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS). getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1); @@ -977,14 +1042,25 @@ public class TestHeartbeatHandler { hb.setReports(reports); hb.setComponentStatus(new ArrayList<ComponentStatus>()); + final HostRoleCommand command = new HostRoleCommand(DummyHostname1, + Role.DATANODE, null, null); + + ActionManager am = getMockActionManager(); + expect(am.getTasks(anyObject(List.class))).andReturn( + new ArrayList<HostRoleCommand>() {{ + add(command); + }}); + replay(am); + + HeartBeatHandler handler = getHeartBeatHandler(am, aq); handler.handleHeartBeat(hb); State componentState1 = serviceComponentHost1.getState(); assertEquals("Host state should still be installing", State.INSTALLING, componentState1); } @Test + @SuppressWarnings("unchecked") public void testOPFailedEventForAbortedTask() throws AmbariException, InvalidStateTransitionException { - ActionManager am = getMockActionManager(); Cluster cluster = getDummyCluster(); @SuppressWarnings("serial") @@ -1002,7 +1078,6 @@ public class TestHeartbeatHandler { hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist(); ActionQueue aq = new ActionQueue(); - HeartBeatHandler handler = getHeartBeatHandler(am, aq); ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS). getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1); @@ -1041,6 +1116,18 @@ public class TestHeartbeatHandler { reports.add(cr); hb.setReports(reports); hb.setComponentStatus(new ArrayList<ComponentStatus>()); + + final HostRoleCommand command = new HostRoleCommand(DummyHostname1, + Role.DATANODE, null, null); + + ActionManager am = getMockActionManager(); + expect(am.getTasks(anyObject(List.class))).andReturn( + new ArrayList<HostRoleCommand>() {{ + add(command); + }}); + replay(am); + + HeartBeatHandler handler = getHeartBeatHandler(am, aq); handler.handleHeartBeat(hb); State componentState1 = serviceComponentHost1.getState(); assertEquals("Host state should still be installing", State.INSTALLING, @@ -1055,10 +1142,9 @@ public class TestHeartbeatHandler { * @throws InvalidStateTransitionException */ @Test - @Ignore //TODO (dlysnichenko) : fix + @SuppressWarnings("unchecked") public void testCommandReportOnHeartbeatUpdatedState() throws AmbariException, InvalidStateTransitionException { - ActionManager am = getMockActionManager(); Cluster cluster = getDummyCluster(); @SuppressWarnings("serial") @@ -1072,7 +1158,6 @@ public class TestHeartbeatHandler { hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist(); ActionQueue aq = new ActionQueue(); - HeartBeatHandler handler = getHeartBeatHandler(am, aq); ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS). getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1); @@ -1100,6 +1185,17 @@ public class TestHeartbeatHandler { hb.setReports(reports); hb.setComponentStatus(new ArrayList<ComponentStatus>()); + final HostRoleCommand command = new HostRoleCommand(DummyHostname1, + Role.DATANODE, null, null); + + ActionManager am = getMockActionManager(); + expect(am.getTasks(anyObject(List.class))).andReturn( + new ArrayList<HostRoleCommand>() {{ + add(command); + }}).anyTimes(); + replay(am); + + HeartBeatHandler handler = getHeartBeatHandler(am, aq); handler.handleHeartBeat(hb); assertEquals("Host state should be " + State.INSTALLED, State.INSTALLED, serviceComponentHost1.getState()); @@ -1171,9 +1267,8 @@ public class TestHeartbeatHandler { } @Test - @Ignore //TODO (dlysnichenko) : fix + @SuppressWarnings("unchecked") public void testUpgradeSpecificHandling() throws AmbariException, InvalidStateTransitionException { - ActionManager am = getMockActionManager(); Cluster cluster = getDummyCluster(); @SuppressWarnings("serial") @@ -1187,7 +1282,6 @@ public class TestHeartbeatHandler { hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist(); ActionQueue aq = new ActionQueue(); - HeartBeatHandler handler = getHeartBeatHandler(am, aq); ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS). getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1); @@ -1214,6 +1308,17 @@ public class TestHeartbeatHandler { hb.setReports(reports); hb.setComponentStatus(new ArrayList<ComponentStatus>()); + final HostRoleCommand command = new HostRoleCommand(DummyHostname1, + Role.DATANODE, null, null); + + ActionManager am = getMockActionManager(); + expect(am.getTasks(anyObject(List.class))).andReturn( + new ArrayList<HostRoleCommand>() {{ + add(command); + }}).anyTimes(); + replay(am); + + HeartBeatHandler handler = getHeartBeatHandler(am, aq); handler.handleHeartBeat(hb); assertEquals("Host state should be " + State.UPGRADING, State.UPGRADING, serviceComponentHost1.getState()); @@ -1237,8 +1342,6 @@ public class TestHeartbeatHandler { assertEquals("Host state should be " + State.UPGRADING, State.UPGRADING, serviceComponentHost1.getState()); - // TODO What happens when there is a TIMEDOUT - serviceComponentHost1.setState(State.UPGRADING); hb.setTimestamp(System.currentTimeMillis()); hb.setResponseId(3); @@ -1261,8 +1364,8 @@ public class TestHeartbeatHandler { } @Test + @SuppressWarnings("unchecked") public void testStatusHeartbeatWithVersion() throws Exception { - ActionManager am = getMockActionManager(); Cluster cluster = getDummyCluster(); @SuppressWarnings("serial") @@ -1321,6 +1424,11 @@ public class TestHeartbeatHandler { hb.setComponentStatus(componentStatuses); ActionQueue aq = new ActionQueue(); + ActionManager am = getMockActionManager(); + expect(am.getTasks(anyObject(List.class))).andReturn( + new ArrayList<HostRoleCommand>() {{ + }}); + replay(am); HeartBeatHandler handler = getHeartBeatHandler(am, aq); handler.handleHeartBeat(hb); assertEquals("Matching value " + serviceComponentHost1.getStackVersion(), @@ -1333,9 +1441,8 @@ public class TestHeartbeatHandler { } @Test - @Ignore //TODO (dlysnichenko) : fix + @SuppressWarnings("unchecked") public void testComponentUpgradeCompleteReport() throws AmbariException, InvalidStateTransitionException { - ActionManager am = getMockActionManager(); Cluster cluster = getDummyCluster(); @SuppressWarnings("serial") @@ -1403,6 +1510,17 @@ public class TestHeartbeatHandler { hb.setReports(reports); ActionQueue aq = new ActionQueue(); + final HostRoleCommand command = new HostRoleCommand(DummyHostname1, + Role.DATANODE, null, null); + + ActionManager am = getMockActionManager(); + expect(am.getTasks(anyObject(List.class))).andReturn( + new ArrayList<HostRoleCommand>() {{ + add(command); + add(command); + }}); + replay(am); + HeartBeatHandler handler = getHeartBeatHandler(am, aq); handler.handleHeartBeat(hb); assertEquals("Stack version for SCH should be updated to " + @@ -1413,9 +1531,8 @@ public class TestHeartbeatHandler { } @Test - @Ignore //TODO (dlysnichenko) : fix + @SuppressWarnings("unchecked") public void testComponentUpgradeInProgressReport() throws AmbariException, InvalidStateTransitionException { - ActionManager am = getMockActionManager(); Cluster cluster = getDummyCluster(); @SuppressWarnings("serial") @@ -1481,6 +1598,17 @@ public class TestHeartbeatHandler { hb.setReports(reports); ActionQueue aq = new ActionQueue(); + final HostRoleCommand command = new HostRoleCommand(DummyHostname1, + Role.DATANODE, null, null); + + ActionManager am = getMockActionManager(); + expect(am.getTasks(anyObject(List.class))).andReturn( + new ArrayList<HostRoleCommand>() {{ + add(command); + add(command); + }}); + replay(am); + HeartBeatHandler handler = getHeartBeatHandler(am, aq); handler.handleHeartBeat(hb); assertEquals("State of SCH not change while operation is in progress", @@ -1493,8 +1621,8 @@ public class TestHeartbeatHandler { @Test + @SuppressWarnings("unchecked") public void testComponentUpgradeFailReport() throws AmbariException, InvalidStateTransitionException { - ActionManager am = getMockActionManager(); Cluster cluster = getDummyCluster(); @SuppressWarnings("serial") @@ -1575,9 +1703,6 @@ public class TestHeartbeatHandler { cr1.setStdOut("dummy output"); cr1.setExitCode(0); -// actionDBAccessor.updateHostRoleState(DummyHostname1, requestId, stageId, -// Role.DATANODE.name(), cr1); - CommandReport cr2 = new CommandReport(); cr2.setActionId(StageUtils.getActionId(requestId, stageId)); cr2.setTaskId(2); @@ -1593,10 +1718,18 @@ public class TestHeartbeatHandler { reports.add(cr2); hb.setReports(reports); -// actionDBAccessor.updateHostRoleState(DummyHostname1, requestId, stageId, -// Role.NAMENODE.name(), cr2); - ActionQueue aq = new ActionQueue(); + final HostRoleCommand command = new HostRoleCommand(DummyHostname1, + Role.DATANODE, null, null); + + ActionManager am = getMockActionManager(); + expect(am.getTasks(anyObject(List.class))).andReturn( + new ArrayList<HostRoleCommand>() {{ + add(command); + add(command); + }}); + replay(am); + HeartBeatHandler handler = getHeartBeatHandler(am, aq); handler.handleHeartBeat(hb); assertEquals("State of SCH should change after fail report", @@ -1612,9 +1745,8 @@ public class TestHeartbeatHandler { } @Test - @Ignore //TODO (dlysnichenko) : fix + @SuppressWarnings("unchecked") public void testProcessStatusReports() throws Exception { - ActionManager am = getMockActionManager(); Clusters fsm = clusters; Cluster cluster = getDummyCluster(); @@ -1631,7 +1763,17 @@ public class TestHeartbeatHandler { ActionQueue aq = new ActionQueue(); + final HostRoleCommand command = new HostRoleCommand(DummyHostname1, + Role.DATANODE, null, null); + ActionManager am = getMockActionManager(); + expect(am.getTasks(anyObject(List.class))).andReturn( + new ArrayList<HostRoleCommand>() {{ + add(command); + add(command); + }}).anyTimes(); + replay(am); HeartBeatHandler handler = new HeartBeatHandler(fsm, aq, am, injector); + Register reg = new Register(); HostInfo hi = new HostInfo(); hi.setHostName(DummyHostname1); @@ -1744,6 +1886,13 @@ public class TestHeartbeatHandler { handler.handleHeartBeat(hb1); assertEquals(HostHealthStatus.HealthStatus.HEALTHY.name(), hostObject.getStatus()); + reset(am); + expect(am.getTasks(anyObject(List.class))).andReturn( + new ArrayList<HostRoleCommand>() {{ + add(command); + }}).anyTimes(); + replay(am); + //Only one component reported status hdfs.getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1).setState(State.INSTALLED); HeartBeat hb4 = new HeartBeat(); @@ -1789,10 +1938,8 @@ public class TestHeartbeatHandler { } @Test - @Ignore //TODO (dlysnichenko) : fix + @SuppressWarnings("unchecked") public void testIgnoreCustomActionReport() throws AmbariException, InvalidStateTransitionException { - ActionManager am = getMockActionManager(); - CommandReport cr1 = new CommandReport(); cr1.setActionId(StageUtils.getActionId(requestId, stageId)); cr1.setTaskId(1); @@ -1828,6 +1975,18 @@ public class TestHeartbeatHandler { hb.setReports(reports); ActionQueue aq = new ActionQueue(); + + final HostRoleCommand command = new HostRoleCommand(DummyHostname1, + Role.DATANODE, null, null); + + ActionManager am = getMockActionManager(); + expect(am.getTasks(anyObject(List.class))).andReturn( + new ArrayList<HostRoleCommand>() {{ + add(command); + add(command); + }}); + replay(am); + HeartBeatHandler handler = getHeartBeatHandler(am, aq); // CUSTOM_COMMAND and ACTIONEXECUTE reports are ignored @@ -1888,9 +2047,18 @@ public class TestHeartbeatHandler { } private ActionManager getMockActionManager() { - return new ActionManager(0, 0, null, null, - actionDBAccessor, new HostsMap((String) null), null, unitOfWork, - injector.getInstance(RequestFactory.class), null); + ActionQueue actionQueueMock = createNiceMock(ActionQueue.class); + Clusters clustersMock = createNiceMock(Clusters.class); + ServerActionManager serverActionManagerMock = createNiceMock(ServerActionManager.class); + Configuration configurationMock = createNiceMock(Configuration.class); + + ActionManager actionManager = createMockBuilder(ActionManager.class). + addMockedMethod("getTasks"). + withConstructor((long)0, (long)0, actionQueueMock, clustersMock, + actionDBAccessor, new HostsMap((String) null), serverActionManagerMock, unitOfWork, + injector.getInstance(RequestFactory.class), configurationMock). + createMock(); + return actionManager; } @@ -1944,9 +2112,8 @@ public class TestHeartbeatHandler { } @Test + @SuppressWarnings("unchecked") public void testCommandStatusProcesses() throws Exception { - ActionManager am = getMockActionManager(); - Cluster cluster = getDummyCluster(); Host hostObject = clusters.getHost(DummyHostname1); clusters.mapHostToCluster(hostObject.getHostName(), cluster.getClusterName()); @@ -1957,7 +2124,6 @@ public class TestHeartbeatHandler { hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1).setState(State.STARTED); ActionQueue aq = new ActionQueue(); - HeartBeatHandler handler = getHeartBeatHandler(am, aq); HeartBeat hb = new HeartBeat(); hb.setTimestamp(System.currentTimeMillis()); @@ -1965,7 +2131,6 @@ public class TestHeartbeatHandler { hb.setHostname(DummyHostname1); hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus)); hb.setReports(new ArrayList<CommandReport>()); - List<Map<String, String>> procs = new ArrayList<Map<String, String>>(); Map<String, String> proc1info = new HashMap<String, String>(); @@ -1992,7 +2157,18 @@ public class TestHeartbeatHandler { componentStatus1.setExtra(extra); componentStatuses.add(componentStatus1); hb.setComponentStatus(componentStatuses); - + + final HostRoleCommand command = new HostRoleCommand(DummyHostname1, + Role.DATANODE, null, null); + + ActionManager am = getMockActionManager(); + expect(am.getTasks(anyObject(List.class))).andReturn( + new ArrayList<HostRoleCommand>() {{ + add(command); + }}).anyTimes(); + replay(am); + + HeartBeatHandler handler = getHeartBeatHandler(am, aq); handler.handleHeartBeat(hb); ServiceComponentHost sch = hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1); @@ -2028,9 +2204,8 @@ public class TestHeartbeatHandler { } @Test + @SuppressWarnings("unchecked") public void testCommandStatusProcesses_empty() throws Exception { - ActionManager am = getMockActionManager(); - Cluster cluster = getDummyCluster(); Host hostObject = clusters.getHost(DummyHostname1); clusters.mapHostToCluster(hostObject.getHostName(), cluster.getClusterName()); @@ -2041,8 +2216,6 @@ public class TestHeartbeatHandler { hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1).setState(State.STARTED); ActionQueue aq = new ActionQueue(); - HeartBeatHandler handler = getHeartBeatHandler(am, aq); - HeartBeat hb = new HeartBeat(); hb.setTimestamp(System.currentTimeMillis()); hb.setResponseId(0); @@ -2060,9 +2233,19 @@ public class TestHeartbeatHandler { componentStatuses.add(componentStatus1); hb.setComponentStatus(componentStatuses); - - handler.handleHeartBeat(hb); - ServiceComponentHost sch = hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1); + + final HostRoleCommand command = new HostRoleCommand(DummyHostname1, + Role.DATANODE, null, null); + + ActionManager am = getMockActionManager(); + expect(am.getTasks(anyObject(List.class))).andReturn( + new ArrayList<HostRoleCommand>() {{ + add(command); + }}); + replay(am); + + HeartBeatHandler handler = getHeartBeatHandler(am, aq); + ServiceComponentHost sch = hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1); Assert.assertEquals(Integer.valueOf(0), Integer.valueOf(sch.getProcesses().size())); }