Repository: ambari Updated Branches: refs/heads/branch-3.0-perf f0def7cec -> 6ecac18cb
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java index c680776..9284f33 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java @@ -74,6 +74,7 @@ import org.apache.ambari.server.agent.ExecutionCommand; import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.HostsMap; import org.apache.ambari.server.events.AmbariEvent; +import org.apache.ambari.server.events.publishers.AgentCommandsPublisher; import org.apache.ambari.server.events.publishers.AmbariEventPublisher; import org.apache.ambari.server.metadata.RoleCommandOrder; import org.apache.ambari.server.metadata.RoleCommandOrderProvider; @@ -190,6 +191,7 @@ public class TestActionScheduler { ServiceComponent scomp = mock(ServiceComponent.class); ServiceComponentHost sch = mock(ServiceComponentHost.class); UnitOfWork unitOfWork = mock(UnitOfWork.class); + AgentCommandsPublisher agentCommandsPublisher = mock(AgentCommandsPublisher.class); when(fsm.getCluster(anyString())).thenReturn(oneClusterMock); when(fsm.getClusterById(anyLong())).thenReturn(oneClusterMock); when(oneClusterMock.getService(anyString())).thenReturn(serviceObj); @@ -230,7 +232,8 @@ public class TestActionScheduler { //Keep large number of attempts so that the task is not expired finally //Small action timeout to test rescheduling ActionScheduler scheduler = new ActionScheduler(100, 5, db, aq, fsm, - 10000, new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, hostRoleCommandDAOMock, null); + 10000, new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, hostRoleCommandDAOMock, + null, agentCommandsPublisher); scheduler.setTaskTimeoutAdjustment(false); List<AgentCommand> ac = waitForQueueSize(hostname, aq, 1, scheduler); @@ -278,6 +281,7 @@ public class TestActionScheduler { ServiceComponent scomp = mock(ServiceComponent.class); ServiceComponentHost sch = mock(ServiceComponentHost.class); UnitOfWork unitOfWork = mock(UnitOfWork.class); + AgentCommandsPublisher agentCommandsPublisher = mock(AgentCommandsPublisher.class); RoleCommandOrderProvider rcoProvider = mock(RoleCommandOrderProvider.class); RoleCommandOrder rco = mock(RoleCommandOrder.class); when(fsm.getCluster(anyString())).thenReturn(oneClusterMock); @@ -330,7 +334,7 @@ public class TestActionScheduler { //Small action timeout to test rescheduling ActionScheduler scheduler = new ActionScheduler(100, 5, db, aq, fsm, 10000, new HostsMap((String) null), unitOfWork, null, conf, - entityManagerProviderMock, hostRoleCommandDAOMock, null, rcoProvider); + entityManagerProviderMock, hostRoleCommandDAOMock, null, rcoProvider, agentCommandsPublisher); scheduler.setTaskTimeoutAdjustment(false); List<AgentCommand> ac = waitForQueueSize(hostname, aq, 1, scheduler); @@ -393,6 +397,7 @@ public class TestActionScheduler { ServiceComponent scomp = mock(ServiceComponent.class); ServiceComponentHost sch = mock(ServiceComponentHost.class); UnitOfWork unitOfWork = mock(UnitOfWork.class); + AgentCommandsPublisher agentCommandsPublisher = mock(AgentCommandsPublisher.class); when(fsm.getCluster(anyString())).thenReturn(oneClusterMock); when(oneClusterMock.getService(anyString())).thenReturn(serviceObj); when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp); @@ -445,7 +450,8 @@ public class TestActionScheduler { //Small action timeout to test rescheduling ActionScheduler scheduler = new ActionScheduler(100, 0, db, aq, fsm, 3, - new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, hostRoleCommandDAOMock, null); + new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, hostRoleCommandDAOMock, null, + agentCommandsPublisher); scheduler.setTaskTimeoutAdjustment(false); // Start the thread @@ -487,6 +493,7 @@ public class TestActionScheduler { ServiceComponent scomp = mock(ServiceComponent.class); ServiceComponentHost sch = mock(ServiceComponentHost.class); UnitOfWork unitOfWork = mock(UnitOfWork.class); + AgentCommandsPublisher agentCommandsPublisher = mock(AgentCommandsPublisher.class); when(fsm.getCluster(anyString())).thenReturn(oneClusterMock); when(oneClusterMock.getService(anyString())).thenReturn(serviceObj); when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp); @@ -532,7 +539,8 @@ public class TestActionScheduler { //Small action timeout to test rescheduling AmbariEventPublisher aep = EasyMock.createNiceMock(AmbariEventPublisher.class); ActionScheduler scheduler = new ActionScheduler(100, 0, db, aq, fsm, 3, - new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, hostRoleCommandDAOMock, null); + new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, hostRoleCommandDAOMock, null, + agentCommandsPublisher); scheduler.setTaskTimeoutAdjustment(false); int cycleCount=0; @@ -584,6 +592,7 @@ public class TestActionScheduler { when(scomp.getServiceComponentHost(hostname2)).thenReturn(sch2); UnitOfWork unitOfWork = mock(UnitOfWork.class); + AgentCommandsPublisher agentCommandsPublisher = mock(AgentCommandsPublisher.class); when(fsm.getCluster(anyString())).thenReturn(oneClusterMock); when(oneClusterMock.getService(anyString())).thenReturn(serviceObj); when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp); @@ -653,7 +662,7 @@ public class TestActionScheduler { // Make sure the NN install doesn't timeout ActionScheduler scheduler = new ActionScheduler(100, 50000, db, aq, fsm, 3, new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, - hostRoleCommandDAOMock, (HostRoleCommandFactory)null); + hostRoleCommandDAOMock, (HostRoleCommandFactory)null, agentCommandsPublisher); scheduler.setTaskTimeoutAdjustment(false); int cycleCount=0; @@ -709,6 +718,7 @@ public class TestActionScheduler { Configuration conf = new Configuration(properties); Clusters fsm = mock(Clusters.class); UnitOfWork unitOfWork = mock(UnitOfWork.class); + AgentCommandsPublisher agentCommandsPublisher = mock(AgentCommandsPublisher.class); Map<String, String> payload = new HashMap<>(); final Stage s = getStageWithServerAction(1, 977, payload, "test", 1200, false, false); @@ -770,7 +780,7 @@ public class TestActionScheduler { ServerActionExecutor.init(injector); ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, - hostRoleCommandDAOMock, (HostRoleCommandFactory)null); + hostRoleCommandDAOMock, (HostRoleCommandFactory)null, agentCommandsPublisher); int cycleCount = 0; while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION") @@ -798,6 +808,7 @@ public class TestActionScheduler { ServiceComponent scomp = mock(ServiceComponent.class); ServiceComponentHost sch = mock(ServiceComponentHost.class); UnitOfWork unitOfWork = mock(UnitOfWork.class); + AgentCommandsPublisher agentCommandsPublisher = mock(AgentCommandsPublisher.class); when(fsm.getCluster(anyString())).thenReturn(oneClusterMock); when(oneClusterMock.getService(anyString())).thenReturn(serviceObj); when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp); @@ -850,7 +861,7 @@ public class TestActionScheduler { ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, new HostsMap((String) null), unitOfWork, EasyMock.createNiceMock(AmbariEventPublisher.class), conf, - entityManagerProviderMock, hostRoleCommandDAOMock, (HostRoleCommandFactory)null); + entityManagerProviderMock, hostRoleCommandDAOMock, (HostRoleCommandFactory)null, agentCommandsPublisher); scheduler.doWork(); @@ -870,6 +881,7 @@ public class TestActionScheduler { Configuration conf = new Configuration(properties); Clusters fsm = mock(Clusters.class); UnitOfWork unitOfWork = mock(UnitOfWork.class); + AgentCommandsPublisher agentCommandsPublisher = mock(AgentCommandsPublisher.class); Map<String, String> payload = new HashMap<>(); payload.put(MockServerAction.PAYLOAD_FORCE_FAIL, "timeout"); @@ -933,7 +945,7 @@ public class TestActionScheduler { ServerActionExecutor.init(injector); ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, - hostRoleCommandDAOMock, (HostRoleCommandFactory)null); + hostRoleCommandDAOMock, (HostRoleCommandFactory)null, agentCommandsPublisher); int cycleCount = 0; while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION").isCompletedState() @@ -954,9 +966,9 @@ public class TestActionScheduler { ActionScheduler scheduler = EasyMock.createMockBuilder(ActionScheduler.class) .withConstructor(long.class, long.class, ActionDBAccessor.class, ActionQueue.class, Clusters.class, int.class, HostsMap.class, UnitOfWork.class, AmbariEventPublisher.class, Configuration.class, - Provider.class, HostRoleCommandDAO.class, HostRoleCommandFactory.class) + Provider.class, HostRoleCommandDAO.class, HostRoleCommandFactory.class, AgentCommandsPublisher.class) .withArgs(100L, 50L, null, null, null, -1, null, null, null, null, entityManagerProviderMock, - mock(HostRoleCommandDAO.class), mock(HostRoleCommandFactory.class)) + mock(HostRoleCommandDAO.class), mock(HostRoleCommandFactory.class), mock(AgentCommandsPublisher.class)) .createNiceMock(); EasyMock.replay(scheduler); @@ -1052,7 +1064,7 @@ public class TestActionScheduler { //aq.enqueue(Stage.INTERNAL_HOSTNAME, s.getExecutionCommands(null).get(0).getExecutionCommand()); List<ExecutionCommand> commandsToSchedule = new ArrayList<>(); - Multimap<String, AgentCommand> commandsToEnqueue = ArrayListMultimap.create(); + Multimap<Long, AgentCommand> commandsToEnqueue = ArrayListMultimap.create(); boolean taskShouldBeSkipped = stageSupportsAutoSkip && autoSkipFailedTask; db.timeoutHostRole(EasyMock.anyString(), EasyMock.anyLong(), EasyMock.anyLong(), @@ -1063,10 +1075,10 @@ public class TestActionScheduler { ActionScheduler scheduler = EasyMock.createMockBuilder(ActionScheduler.class) .withConstructor(long.class, long.class, ActionDBAccessor.class, ActionQueue.class, Clusters.class, int.class, HostsMap.class, UnitOfWork.class, AmbariEventPublisher.class, Configuration.class, - Provider.class, HostRoleCommandDAO.class, HostRoleCommandFactory.class) + Provider.class, HostRoleCommandDAO.class, HostRoleCommandFactory.class,AgentCommandsPublisher.class) .withArgs(100L, 50L, db, aq, fsm, -1, null, null, ambariEventPublisher, null, entityManagerProviderMock, mock(HostRoleCommandDAO.class), - mock(HostRoleCommandFactory.class)) + mock(HostRoleCommandFactory.class), mock(AgentCommandsPublisher.class)) .createNiceMock(); EasyMock.replay(scheduler, fsm, host, db, cluster, ambariEventPublisher, service, serviceComponent, serviceComponentHost); @@ -1085,6 +1097,7 @@ public class TestActionScheduler { Configuration conf = new Configuration(properties); Clusters fsm = mock(Clusters.class); UnitOfWork unitOfWork = mock(UnitOfWork.class); + AgentCommandsPublisher agentCommandsPublisher = mock(AgentCommandsPublisher.class); Map<String, String> payload = new HashMap<>(); payload.put(MockServerAction.PAYLOAD_FORCE_FAIL, "exception"); @@ -1147,7 +1160,7 @@ public class TestActionScheduler { ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, - hostRoleCommandDAOMock, (HostRoleCommandFactory)null); + hostRoleCommandDAOMock, (HostRoleCommandFactory)null, agentCommandsPublisher); int cycleCount = 0; while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION") @@ -1199,7 +1212,9 @@ public class TestActionScheduler { ServiceComponent scomp = mock(ServiceComponent.class); ServiceComponentHost sch = mock(ServiceComponentHost.class); UnitOfWork unitOfWork = mock(UnitOfWork.class); + AgentCommandsPublisher agentCommandsPublisher = mock(AgentCommandsPublisher.class); when(fsm.getCluster(anyString())).thenReturn(oneClusterMock); + //when(fsm.getHost(anyString())).thenReturn(oneClusterMock); when(oneClusterMock.getService(anyString())).thenReturn(serviceObj); when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp); when(scomp.getServiceComponentHost(anyString())).thenReturn(sch); @@ -1262,7 +1277,7 @@ public class TestActionScheduler { Configuration conf = new Configuration(properties); ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3, new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, - hostRoleCommandDAOMock, (HostRoleCommandFactory)null)); + hostRoleCommandDAOMock, (HostRoleCommandFactory)null, agentCommandsPublisher)); doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString()); @@ -1287,6 +1302,7 @@ public class TestActionScheduler { ServiceComponent scomp = mock(ServiceComponent.class); ServiceComponentHost sch = mock(ServiceComponentHost.class); UnitOfWork unitOfWork = mock(UnitOfWork.class); + AgentCommandsPublisher agentCommandsPublisher = mock(AgentCommandsPublisher.class); when(fsm.getCluster(anyString())).thenReturn(oneClusterMock); when(oneClusterMock.getService(anyString())).thenReturn(serviceObj); when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp); @@ -1298,7 +1314,7 @@ public class TestActionScheduler { String hostname3 = "chost.ambari.apache.org"; String hostname4 = "chost.ambari.apache.org"; HashMap<String, ServiceComponentHost> hosts = - new HashMap<>(); + new HashMap<>(); hosts.put(hostname1, sch); hosts.put(hostname2, sch); hosts.put(hostname3, sch); @@ -1307,13 +1323,13 @@ public class TestActionScheduler { List<Stage> stages = new ArrayList<>(); Stage stage = getStageWithSingleTask( - hostname1, "cluster1", Role.HIVE_CLIENT, - RoleCommand.INSTALL, Service.Type.HIVE, 1, 1, 1); + hostname1, "cluster1", Role.HIVE_CLIENT, + RoleCommand.INSTALL, Service.Type.HIVE, 1, 1, 1); Map<String, String> hiveSite = new TreeMap<>(); hiveSite.put("javax.jdo.option.ConnectionPassword", "password"); hiveSite.put("hive.server2.thrift.port", "10000"); Map<String, Map<String, String>> configurations = - new TreeMap<>(); + new TreeMap<>(); configurations.put("hive-site", hiveSite); stage.getExecutionCommands(hostname1).get(0).getExecutionCommand().setConfigurations(configurations); stages.add(stage); @@ -1354,9 +1370,9 @@ public class TestActionScheduler { properties.put(Configuration.PARALLEL_STAGE_EXECUTION.getKey(), "false"); Configuration conf = new Configuration(properties); ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3, - new HostsMap((String) null), + new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, - hostRoleCommandDAOMock, (HostRoleCommandFactory)null)); + hostRoleCommandDAOMock, (HostRoleCommandFactory)null, agentCommandsPublisher)); doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString()); @@ -1369,7 +1385,7 @@ public class TestActionScheduler { Assert.assertEquals(HostRoleStatus.PENDING, stages.get(3).getHostRoleStatus(hostname3, "DATANODE")); Assert.assertEquals(HostRoleStatus.PENDING, stages.get(4).getHostRoleStatus(hostname4, "GANGLIA_MONITOR")); Assert.assertFalse(stages.get(0).getExecutionCommands(hostname1).get(0).getExecutionCommand(). - getConfigurations().containsKey("javax.jdo.option.ConnectionPassword")); + getConfigurations().containsKey("javax.jdo.option.ConnectionPassword")); } /** * Verifies that ActionScheduler allows to execute background tasks in parallel @@ -1383,6 +1399,7 @@ public class TestActionScheduler { ServiceComponent scomp = mock(ServiceComponent.class); ServiceComponentHost sch = mock(ServiceComponentHost.class); UnitOfWork unitOfWork = mock(UnitOfWork.class); + AgentCommandsPublisher agentCommandsPublisher = mock(AgentCommandsPublisher.class); when(fsm.getCluster(anyString())).thenReturn(oneClusterMock); when(oneClusterMock.getService(anyString())).thenReturn(serviceObj); when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp); @@ -1392,7 +1409,7 @@ public class TestActionScheduler { String hostname1 = "ahost.ambari.apache.org"; String hostname2 = "bhost.ambari.apache.org"; HashMap<String, ServiceComponentHost> hosts = - new HashMap<>(); + new HashMap<>(); hosts.put(hostname1, sch); hosts.put(hostname2, sch); when(scomp.getServiceComponentHosts()).thenReturn(hosts); @@ -1434,7 +1451,7 @@ public class TestActionScheduler { ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3, new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, - hostRoleCommandDAOMock, (HostRoleCommandFactory)null)); + hostRoleCommandDAOMock, (HostRoleCommandFactory)null, agentCommandsPublisher)); doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString()); @@ -1455,6 +1472,7 @@ public class TestActionScheduler { ServiceComponent scomp = mock(ServiceComponent.class); ServiceComponentHost sch = mock(ServiceComponentHost.class); UnitOfWork unitOfWork = mock(UnitOfWork.class); + AmbariEventPublisher ambariEventPublisher = mock(AmbariEventPublisher.class); RequestFactory requestFactory = mock(RequestFactory.class); when(fsm.getCluster(anyString())).thenReturn(oneClusterMock); when(oneClusterMock.getService(anyString())).thenReturn(serviceObj); @@ -1463,7 +1481,7 @@ public class TestActionScheduler { when(serviceObj.getCluster()).thenReturn(oneClusterMock); HashMap<String, ServiceComponentHost> hosts = - new HashMap<>(); + new HashMap<>(); hosts.put(hostname, sch); when(scomp.getServiceComponentHosts()).thenReturn(hosts); @@ -1566,14 +1584,14 @@ public class TestActionScheduler { Capture<Collection<HostRoleCommand>> cancelCommandList = EasyMock.newCapture(); ActionScheduler scheduler = EasyMock.createMockBuilder(ActionScheduler.class). withConstructor((long)100, (long)50, db, aq, fsm, 3, - new HostsMap((String) null), + new HostsMap((String) null), unitOfWork, EasyMock.createNiceMock(AmbariEventPublisher.class), conf, entityManagerProviderMock, mock(HostRoleCommandDAO.class), mock(HostRoleCommandFactory.class)). - addMockedMethod("cancelHostRoleCommands"). - createMock(); + addMockedMethod("cancelHostRoleCommands"). + createMock(); scheduler.cancelHostRoleCommands(EasyMock.capture(cancelCommandList), - EasyMock.eq(ActionScheduler.FAILED_TASK_ABORT_REASONING)); + EasyMock.eq(ActionScheduler.FAILED_TASK_ABORT_REASONING)); EasyMock.expectLastCall().once(); EasyMock.replay(scheduler); @@ -1607,6 +1625,7 @@ public class TestActionScheduler { ServiceComponent scomp = mock(ServiceComponent.class); ServiceComponentHost sch = mock(ServiceComponentHost.class); UnitOfWork unitOfWork = mock(UnitOfWork.class); + AgentCommandsPublisher agentCommandsPublisher = mock(AgentCommandsPublisher.class); when(fsm.getCluster(anyString())).thenReturn(oneClusterMock); when(oneClusterMock.getService(anyString())).thenReturn(serviceObj); when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp); @@ -1617,7 +1636,7 @@ public class TestActionScheduler { String host2 = "host2"; Host host = mock(Host.class); HashMap<String, ServiceComponentHost> hosts = - new HashMap<>(); + new HashMap<>(); hosts.put(host1, sch); hosts.put(host2, sch); when(scomp.getServiceComponentHosts()).thenReturn(hosts); @@ -1752,7 +1771,7 @@ public class TestActionScheduler { ActionScheduler scheduler = new ActionScheduler(100, 10000, db, aq, fsm, 3, new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, - hostRoleCommandDAOMock, (HostRoleCommandFactory)null); + hostRoleCommandDAOMock, (HostRoleCommandFactory)null, agentCommandsPublisher); scheduler.doWork(); @@ -1824,6 +1843,7 @@ public class TestActionScheduler { ServiceComponent scomp = mock(ServiceComponent.class); ServiceComponentHost sch = mock(ServiceComponentHost.class); UnitOfWork unitOfWork = mock(UnitOfWork.class); + AgentCommandsPublisher agentCommandsPublisher = mock(AgentCommandsPublisher.class); RequestFactory requestFactory = mock(RequestFactory.class); when(fsm.getCluster(anyString())).thenReturn(oneClusterMock); when(oneClusterMock.getService(anyString())).thenReturn(serviceObj); @@ -1835,7 +1855,7 @@ public class TestActionScheduler { long now = System.currentTimeMillis(); Stage stage = stageFactory.createNew(1, "/tmp", "cluster1", 1L, "testRequestFailureBasedOnSuccessFactor", - "", ""); + "", ""); stage.setStageId(1); stage.addHostRoleExecutionCommand("host1", Role.DATANODE, RoleCommand.UPGRADE, new ServiceComponentHostUpgradeEvent(Role.DATANODE.toString(), "host1", now, "HDP-0.2"), @@ -1948,7 +1968,7 @@ public class TestActionScheduler { ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, - hostRoleCommandDAOMock, (HostRoleCommandFactory)null); + hostRoleCommandDAOMock, (HostRoleCommandFactory)null, agentCommandsPublisher); ActionManager am = new ActionManager(db, requestFactory, scheduler); @@ -1986,7 +2006,7 @@ public class TestActionScheduler { private Stage createStage(String clusterName, int stageId, int requestId) { Stage stage = stageFactory.createNew(requestId, "/tmp", clusterName, 1L, "getStageWithSingleTask", - "{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}"); + "{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}"); stage.setStageId(stageId); return stage; } @@ -2011,7 +2031,7 @@ public class TestActionScheduler { private Stage getStageWithSingleTask(String hostname, String clusterName, Role role, RoleCommand roleCommand, - String customCommandName, Service.Type service, int taskId, int stageId, int requestId) { + String customCommandName, Service.Type service, int taskId, int stageId, int requestId) { Stage stage = getStageWithSingleTask(hostname, clusterName, role, roleCommand, service, taskId, stageId, requestId); HostRoleCommand cmd = stage.getHostRoleCommand(hostname, role.name()); @@ -2099,6 +2119,7 @@ public class TestActionScheduler { ServiceComponent scomp = mock(ServiceComponent.class); ServiceComponentHost sch = mock(ServiceComponentHost.class); UnitOfWork unitOfWork = mock(UnitOfWork.class); + AgentCommandsPublisher agentCommandsPublisher = mock(AgentCommandsPublisher.class); when(fsm.getCluster(anyString())).thenReturn(oneClusterMock); when(oneClusterMock.getService(anyString())).thenReturn(serviceObj); when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp); @@ -2135,7 +2156,7 @@ public class TestActionScheduler { //Small action timeout to test rescheduling ActionScheduler scheduler = new ActionScheduler(100, 100, db, aq, fsm, 10000, new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, - hostRoleCommandDAOMock, (HostRoleCommandFactory)null); + hostRoleCommandDAOMock, (HostRoleCommandFactory)null, agentCommandsPublisher); scheduler.setTaskTimeoutAdjustment(false); List<AgentCommand> ac = waitForQueueSize(hostname, aq, 1, scheduler); @@ -2220,7 +2241,7 @@ public class TestActionScheduler { ActionScheduler scheduler = new ActionScheduler(100, 50000, db, aq, fsm, 3, new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, - hostRoleCommandDAO, (HostRoleCommandFactory) null); + hostRoleCommandDAO, (HostRoleCommandFactory) null, null); final CountDownLatch abortCalls = new CountDownLatch(2); @@ -2271,6 +2292,7 @@ public class TestActionScheduler { Configuration conf = new Configuration(properties); Clusters fsm = mock(Clusters.class); UnitOfWork unitOfWork = mock(UnitOfWork.class); + AgentCommandsPublisher agentCommandsPublisher = mock(AgentCommandsPublisher.class); Map<String, String> payload = new HashMap<>(); final Stage s = getStageWithServerAction(1, 977, payload, "test", 300, false, false); @@ -2332,7 +2354,7 @@ public class TestActionScheduler { ServerActionExecutor.init(injector); ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, - hostRoleCommandDAOMock, (HostRoleCommandFactory)null); + hostRoleCommandDAOMock, (HostRoleCommandFactory)null, agentCommandsPublisher); int cycleCount = 0; while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION") @@ -2354,6 +2376,7 @@ public class TestActionScheduler { ServiceComponent scomp = mock(ServiceComponent.class); ServiceComponentHost sch = mock(ServiceComponentHost.class); UnitOfWork unitOfWork = mock(UnitOfWork.class); + AgentCommandsPublisher agentCommandsPublisher = mock(AgentCommandsPublisher.class); HostRoleCommandDAO hostRoleCommandDAO = mock(HostRoleCommandDAO.class); HostRoleCommandFactory hostRoleCommandFactory = mock(HostRoleCommandFactory.class); @@ -2363,8 +2386,11 @@ public class TestActionScheduler { when(scomp.getServiceComponentHost(anyString())).thenReturn(sch); when(serviceObj.getCluster()).thenReturn(oneClusterMock); + final Long hostId = 1L; + HostEntity hostEntity = new HostEntity(); hostEntity.setHostName(hostname); + //hostEntity.setHostId(hostId); hostDAO.create(hostEntity); HashMap<String, ServiceComponentHost> hosts = new HashMap<>(); @@ -2430,6 +2456,7 @@ public class TestActionScheduler { when(fsm.getHost(anyString())).thenReturn(host); when(host.getState()).thenReturn(HostState.HEALTHY); when(host.getHostName()).thenReturn(hostname); + when(host.getHostId()).thenReturn(hostId); ActionDBAccessor db = mock(ActionDBAccessor.class); @@ -2518,6 +2545,19 @@ public class TestActionScheduler { return abortedCommands; } }).when(db).abortOperation(anyLong()); + Map<Long, List<AgentCommand>> commands = new HashMap<>(); + + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + Long hostId = (Long) invocation.getArguments()[0]; + if (!commands.containsKey(hostId)) { + commands.put(hostId, new ArrayList<>()); + } + commands.get(hostId).add((AgentCommand) invocation.getArguments()[1]); + return null; + } + }).when(agentCommandsPublisher).sendAgentCommand(anyLong(), any(AgentCommand.class)); Properties properties = new Properties(); Configuration conf = new Configuration(properties); @@ -2527,7 +2567,7 @@ public class TestActionScheduler { ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, - hostRoleCommandDAO, hostRoleCommandFactory); + hostRoleCommandDAO, hostRoleCommandFactory, agentCommandsPublisher); scheduler.doWork(); @@ -2540,9 +2580,9 @@ public class TestActionScheduler { Assert.assertEquals(HostRoleStatus.ABORTED, allStages.get(1).getHostRoleStatus(hostname, "NAMENODE")); Assert.assertEquals(HostRoleStatus.ABORTED, allStages.get(2).getHostRoleStatus(hostname, "DATANODE")); - Assert.assertEquals(aq.size(hostname), 1); // Cancel commands should be generated only for 1 stage + Assert.assertEquals(1, commands.get(hostId).size()); // Cancel commands should be generated only for 1 stage - CancelCommand cancelCommand = (CancelCommand) aq.dequeue(hostname); + CancelCommand cancelCommand = (CancelCommand) commands.get(hostId).get(0); Assert.assertEquals(cancelCommand.getTargetTaskId(), namenodeCmdTaskId); Assert.assertEquals(cancelCommand.getReason(), reason); } @@ -2557,6 +2597,7 @@ public class TestActionScheduler { ServiceComponent scomp = mock(ServiceComponent.class); ServiceComponentHost sch = mock(ServiceComponentHost.class); UnitOfWork unitOfWork = mock(UnitOfWork.class); + AgentCommandsPublisher agentCommandsPublisher = mock(AgentCommandsPublisher.class); when(fsm.getCluster(anyString())).thenReturn(oneClusterMock); when(oneClusterMock.getService(anyString())).thenReturn(serviceObj); when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp); @@ -2703,7 +2744,7 @@ public class TestActionScheduler { ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3, new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, - hostRoleCommandDAOMock, (HostRoleCommandFactory)null)); + hostRoleCommandDAOMock, (HostRoleCommandFactory)null, agentCommandsPublisher)); doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString()); @@ -2773,7 +2814,7 @@ public class TestActionScheduler { ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, - (HostRoleCommandDAO)null, (HostRoleCommandFactory)null); + (HostRoleCommandDAO)null, (HostRoleCommandFactory)null, null); HostRoleCommand hrc1 = hostRoleCommandFactory.create("h1", Role.NAMENODE, null, RoleCommand.EXECUTE); hrc1.setStatus(HostRoleStatus.COMPLETED); @@ -2806,7 +2847,7 @@ public class TestActionScheduler { ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, - (HostRoleCommandDAO)null, (HostRoleCommandFactory)null); + (HostRoleCommandDAO)null, (HostRoleCommandFactory)null, null); HostRoleCommand hrc1 = hostRoleCommandFactory.create("h1", Role.NAMENODE, null, RoleCommand.EXECUTE); hrc1.setStatus(HostRoleStatus.COMPLETED); @@ -2833,11 +2874,16 @@ public class TestActionScheduler { ActionQueue aq = new ActionQueue(); Clusters fsm = mock(Clusters.class); Cluster oneClusterMock = mock(Cluster.class); + Host host = mock(Host.class); Service serviceObj = mock(Service.class); ServiceComponent scomp = mock(ServiceComponent.class); ServiceComponentHost sch = mock(ServiceComponentHost.class); UnitOfWork unitOfWork = mock(UnitOfWork.class); + AgentCommandsPublisher agentCommandsPublisher = mock(AgentCommandsPublisher.class); when(fsm.getCluster(anyString())).thenReturn(oneClusterMock); + when(fsm.getHost(anyString())).thenReturn(host); + when(host.getHostId()).thenReturn(1L); + when(host.getState()).thenReturn(HostState.HEALTHY); when(oneClusterMock.getService(anyString())).thenReturn(serviceObj); when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp); when(scomp.getServiceComponentHost(anyString())).thenReturn(sch); @@ -2953,7 +2999,7 @@ public class TestActionScheduler { ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3, new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, - hostRoleCommandDAOMock, (HostRoleCommandFactory)null)); + hostRoleCommandDAOMock, (HostRoleCommandFactory)null, agentCommandsPublisher)); doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString()); @@ -3001,7 +3047,7 @@ public class TestActionScheduler { ActionScheduler scheduler = new ActionScheduler(100, 50, actionDBAccessor, null, null, 3, new HostsMap((String) null), null, null, null, entityManagerProviderMock, - (HostRoleCommandDAO)null, (HostRoleCommandFactory)null); + (HostRoleCommandDAO)null, (HostRoleCommandFactory)null, null, null); replay(previousStage, nextStage, actionDBAccessor, hostRoleCommand); @@ -3020,7 +3066,7 @@ public class TestActionScheduler { expect(nextStage.getStageId()).andReturn(0L); ActionScheduler scheduler = new ActionScheduler(100, 50, null, null, null, 3, - new HostsMap((String) null), null, null, null, entityManagerProviderMock, null, null); + new HostsMap((String) null), null, null, null, entityManagerProviderMock, null, null, null); replay(nextStage); @@ -3058,7 +3104,7 @@ public class TestActionScheduler { expect(previousStage.getSuccessFactor(Role.DATANODE)).andReturn(0.5F); ActionScheduler scheduler = new ActionScheduler(100, 50, actionDBAccessor, null, null, 3, - new HostsMap((String) null), null, null, null, entityManagerProviderMock, null, null); + new HostsMap((String) null), null, null, null, entityManagerProviderMock, null, null, null); replay(previousStage, nextStage, actionDBAccessor, hostRoleCommand); http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/test/java/org/apache/ambari/server/agent/AgentSessionManagerTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/agent/AgentSessionManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/agent/AgentSessionManagerTest.java index 89f8998..f06a887 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/agent/AgentSessionManagerTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/agent/AgentSessionManagerTest.java @@ -42,21 +42,22 @@ public class AgentSessionManagerTest { @Test public void hostIsRegistered() throws HostNotRegisteredException { String sessionId = "session ID"; - String hostName = "example.com"; + Long hostId = 1L; Host host = EasyMock.createNiceMock(Host.class); - expect(host.getHostName()).andReturn(hostName).anyTimes(); + expect(host.getHostId()).andReturn(hostId).anyTimes(); replay(host); underTest.register(sessionId, host); assertTrue(underTest.isRegistered(sessionId)); - assertEquals(sessionId, underTest.getSessionId(hostName)); + assertEquals(sessionId, underTest.getSessionId(hostId)); assertSame(host, underTest.getHost(sessionId)); } @Test(expected = HostNotRegisteredException.class) public void exceptionThrownForUnknownHost() throws HostNotRegisteredException { - underTest.getSessionId("not registered host"); + Long notRegisteredHostId = 2L; + underTest.getSessionId(notRegisteredHostId); } @Test(expected = HostNotRegisteredException.class) @@ -68,32 +69,32 @@ public class AgentSessionManagerTest { public void registerRemovesOldSessionId() throws HostNotRegisteredException { String oldSessionId = "old session ID"; String newSessionId = "new session ID"; - String hostName = "example.com"; + Long hostId = 1L; Host host = EasyMock.createNiceMock(Host.class); - expect(host.getHostName()).andReturn(hostName).anyTimes(); + expect(host.getHostId()).andReturn(hostId).anyTimes(); replay(host); underTest.register(oldSessionId, host); underTest.register(newSessionId, host); assertFalse(underTest.isRegistered(oldSessionId)); - assertEquals(newSessionId, underTest.getSessionId(hostName)); + assertEquals(newSessionId, underTest.getSessionId(hostId)); assertSame(host, underTest.getHost(newSessionId)); } @Test(expected = HostNotRegisteredException.class) public void unregisterRemovesSessionId() throws HostNotRegisteredException { String sessionId = "session ID"; - String hostName = "example.com"; + Long hostId = 1L; Host host = EasyMock.createNiceMock(Host.class); - expect(host.getHostName()).andReturn(hostName).anyTimes(); + expect(host.getHostId()).andReturn(hostId).anyTimes(); replay(host); underTest.register(sessionId, host); - underTest.unregisterByHost(hostName); + underTest.unregisterByHost(hostId); assertFalse(underTest.isRegistered(sessionId)); - underTest.getSessionId(sessionId); + underTest.getSessionId(hostId); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListenerTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListenerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListenerTest.java index 87a7f9e..fc60758 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListenerTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListenerTest.java @@ -20,6 +20,7 @@ package org.apache.ambari.server.events.listeners.tasks; import static org.easymock.EasyMock.anyLong; import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.eq; import java.util.ArrayList; import java.util.Collections; @@ -35,17 +36,14 @@ import org.apache.ambari.server.events.TaskCreateEvent; import org.apache.ambari.server.events.TaskUpdateEvent; import org.apache.ambari.server.events.publishers.StateUpdateEventPublisher; import org.apache.ambari.server.events.publishers.TaskEventPublisher; -import org.apache.ambari.server.orm.dao.ClusterDAO; import org.apache.ambari.server.orm.dao.ExecutionCommandDAO; import org.apache.ambari.server.orm.dao.HostDAO; -import org.apache.ambari.server.orm.dao.HostRoleCommandDAO; import org.apache.ambari.server.orm.dao.RequestDAO; import org.apache.ambari.server.orm.dao.StageDAO; import org.apache.ambari.server.orm.entities.RequestEntity; import org.apache.ambari.server.orm.entities.StageEntity; import org.apache.ambari.server.orm.entities.StageEntityPK; import org.apache.ambari.server.state.ServiceComponentHostEvent; -import org.apache.ambari.server.topology.TopologyManager; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; import org.junit.Assert; @@ -57,7 +55,6 @@ import com.google.inject.Inject; public class TaskStatusListenerTest extends EasyMockSupport { private TaskEventPublisher publisher = new TaskEventPublisher(); - private StateUpdateEventPublisher statePublisher = new StateUpdateEventPublisher(); @Inject private ExecutionCommandDAO executionCommandDAO; @@ -71,7 +68,9 @@ public class TaskStatusListenerTest extends EasyMockSupport { List<HostRoleCommand> hostRoleCommands = new ArrayList<>(); ServiceComponentHostEvent serviceComponentHostEvent = createNiceMock(ServiceComponentHostEvent.class); HostDAO hostDAO = createNiceMock(HostDAO.class); - replayAll(); + + EasyMock.replay(hostDAO); + EasyMock.replay(serviceComponentHostEvent); int hostRoleCommandSize = 3; int hrcCounter = 1; @@ -90,12 +89,10 @@ public class TaskStatusListenerTest extends EasyMockSupport { HostRoleStatus hostRoleStatus = HostRoleStatus.PENDING; StageDAO stageDAO = createNiceMock(StageDAO.class); - HostRoleCommandDAO hostRoleCommandDAO = createNiceMock(HostRoleCommandDAO.class); - TopologyManager topologyManager = createNiceMock(TopologyManager.class); RequestDAO requestDAO = createNiceMock(RequestDAO.class); StageEntity stageEntity = createNiceMock(StageEntity.class); - ClusterDAO clusterDAO = createNiceMock(ClusterDAO.class); RequestEntity requestEntity = createNiceMock(RequestEntity.class); + StateUpdateEventPublisher statePublisher = createNiceMock(StateUpdateEventPublisher.class); EasyMock.expect(stageEntity.getStatus()).andReturn(hostRoleStatus).anyTimes();; EasyMock.expect(stageEntity.getDisplayStatus()).andReturn(hostRoleStatus).anyTimes(); EasyMock.expect(stageEntity.isSkippable()).andReturn(Boolean.FALSE).anyTimes();; @@ -105,19 +102,17 @@ public class TaskStatusListenerTest extends EasyMockSupport { EasyMock.expect(requestEntity.getDisplayStatus()).andReturn(hostRoleStatus).anyTimes(); EasyMock.expect(requestDAO.findByPK(anyLong())).andReturn(requestEntity).anyTimes(); - requestDAO.updateStatus(1L,HostRoleStatus.COMPLETED,HostRoleStatus.SKIPPED_FAILED); - EasyMock.expectLastCall().times(1); - - + EasyMock.expect(requestDAO.updateStatus(eq(1L), eq(HostRoleStatus.COMPLETED), + eq(HostRoleStatus.SKIPPED_FAILED))).andReturn(new RequestEntity()).times(1); EasyMock.replay(stageEntity); EasyMock.replay(requestEntity); EasyMock.replay(stageDAO); EasyMock.replay(requestDAO); + EasyMock.replay(statePublisher); TaskCreateEvent event = new TaskCreateEvent(hostRoleCommands); - TaskStatusListener listener = new TaskStatusListener(publisher,stageDAO,requestDAO,statePublisher, - hostRoleCommandDAO,topologyManager, clusterDAO); + TaskStatusListener listener = new TaskStatusListener(publisher,stageDAO,requestDAO,statePublisher); Assert.assertTrue(listener.getActiveTasksMap().isEmpty()); Assert.assertTrue(listener.getActiveStageMap().isEmpty()); http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog300Test.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog300Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog300Test.java index 25e9dbf..fc59e65 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog300Test.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog300Test.java @@ -130,12 +130,14 @@ public class UpgradeCatalog300Test { Method showHcatDeletedUserMessage = UpgradeCatalog300.class.getDeclaredMethod("showHcatDeletedUserMessage"); Method setStatusOfStagesAndRequests = UpgradeCatalog300.class.getDeclaredMethod("setStatusOfStagesAndRequests"); Method updateLogSearchConfigs = UpgradeCatalog300.class.getDeclaredMethod("updateLogSearchConfigs"); + Method updateHostComponentLastStateTable = UpgradeCatalog300.class.getDeclaredMethod("updateHostComponentLastStateTable"); UpgradeCatalog300 upgradeCatalog300 = createMockBuilder(UpgradeCatalog300.class) .addMockedMethod(showHcatDeletedUserMessage) .addMockedMethod(addNewConfigurationsFromXml) .addMockedMethod(setStatusOfStagesAndRequests) .addMockedMethod(updateLogSearchConfigs) + .addMockedMethod(updateHostComponentLastStateTable) .createMock(); @@ -144,6 +146,7 @@ public class UpgradeCatalog300Test { upgradeCatalog300.setStatusOfStagesAndRequests(); upgradeCatalog300.updateLogSearchConfigs(); + upgradeCatalog300.updateHostComponentLastStateTable(); expectLastCall().once(); replay(upgradeCatalog300); @@ -168,6 +171,9 @@ public class UpgradeCatalog300Test { Capture<DBAccessor.DBColumnInfo> hrcOpsDisplayNameColumn = newCapture(); dbAccessor.addColumn(eq(UpgradeCatalog300.HOST_ROLE_COMMAND_TABLE), capture(hrcOpsDisplayNameColumn)); + Capture<DBAccessor.DBColumnInfo> lastValidColumn = newCapture(); + dbAccessor.addColumn(eq(UpgradeCatalog300.COMPONENT_LAST_STATE_COLUMN), capture(lastValidColumn)); + dbAccessor.dropColumn(COMPONENT_DESIRED_STATE_TABLE, SECURITY_STATE_COLUMN); expectLastCall().once(); dbAccessor.dropColumn(COMPONENT_STATE_TABLE, SECURITY_STATE_COLUMN); expectLastCall().once(); dbAccessor.dropColumn(SERVICE_DESIRED_STATE_TABLE, SECURITY_STATE_COLUMN); expectLastCall().once(); @@ -183,6 +189,11 @@ public class UpgradeCatalog300Test { Assert.assertEquals(null, capturedOpsDisplayNameColumn.getDefaultValue()); Assert.assertEquals(String.class, capturedOpsDisplayNameColumn.getType()); + DBAccessor.DBColumnInfo capturedLastValidColumn = lastValidColumn.getValue(); + Assert.assertEquals(UpgradeCatalog300.HRC_OPS_DISPLAY_NAME_COLUMN, capturedLastValidColumn.getName()); + Assert.assertEquals(null, capturedLastValidColumn.getDefaultValue()); + Assert.assertEquals(String.class, capturedLastValidColumn.getType()); + verify(dbAccessor); }
