Repository: ambari Updated Branches: refs/heads/branch-1.7.0 3a9d5c5ec -> 78f76f02c refs/heads/trunk 2d4f7d4b8 -> 06d61c1ad
AMBARI-7828. TestActionScheduler fails occasionally on builds.a.o stating expected:<ABORTED> but was:<PENDING> (dlysnichenko) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/f0887efb Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/f0887efb Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/f0887efb Branch: refs/heads/branch-1.7.0 Commit: f0887efb78ac69aadb700107d1ea4380949e038d Parents: 3a9d5c5 Author: Lisnichenko Dmitro <[email protected]> Authored: Fri Oct 24 12:48:47 2014 +0300 Committer: Lisnichenko Dmitro <[email protected]> Committed: Fri Oct 24 12:48:47 2014 +0300 ---------------------------------------------------------------------- .../actionmanager/TestActionScheduler.java | 95 ++++++++++---------- 1 file changed, 46 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/f0887efb/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java index b9a38a4..7224924 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java @@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit; import com.google.common.reflect.TypeToken; import com.google.inject.persist.UnitOfWork; import junit.framework.Assert; +import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.Role; import org.apache.ambari.server.RoleCommand; import org.apache.ambari.server.ServiceComponentHostNotFoundException; @@ -86,6 +87,7 @@ public class TestActionScheduler { + " c6402.ambari.apache.org], slave_hosts=[c6401.ambari.apache.org," + " c6402.ambari.apache.org]}"; private final String hostname = "ahost.ambari.apache.org"; + private final int MAX_CYCLE_ITERATIONS = 100; /** @@ -138,19 +140,17 @@ public class TestActionScheduler { //Keep large number of attempts so that the task is not expired finally //Small action timeout to test rescheduling - ActionScheduler scheduler = new ActionScheduler(100, 100, db, aq, fsm, + ActionScheduler scheduler = new ActionScheduler(100, 5, db, aq, fsm, 10000, new HostsMap((String) null), null, unitOfWork, conf); scheduler.setTaskTimeoutAdjustment(false); - // Start the thread - scheduler.start(); - List<AgentCommand> ac = waitForQueueSize(hostname, aq, 1); + List<AgentCommand> ac = waitForQueueSize(hostname, aq, 1, scheduler); assertTrue(ac.get(0) instanceof ExecutionCommand); assertEquals("1-977", ((ExecutionCommand) (ac.get(0))).getCommandId()); assertEquals(clusterHostInfo, ((ExecutionCommand) (ac.get(0))).getClusterHostInfo()); //The action status has not changed, it should be queued again. - ac = waitForQueueSize(hostname, aq, 1); + ac = waitForQueueSize(hostname, aq, 1, scheduler); assertTrue(ac.get(0) instanceof ExecutionCommand); assertEquals("1-977", ((ExecutionCommand) (ac.get(0))).getCommandId()); assertEquals(clusterHostInfo, ((ExecutionCommand) (ac.get(0))).getClusterHostInfo()); @@ -160,13 +160,13 @@ public class TestActionScheduler { ac = aq.dequeueAll(hostname); //Wait for sometime, it shouldn't be scheduled this time. - ac = waitForQueueSize(hostname, aq, 0); - scheduler.stop(); + ac = waitForQueueSize(hostname, aq, 0, scheduler); } private List<AgentCommand> waitForQueueSize(String hostname, ActionQueue aq, - int expectedQueueSize) throws InterruptedException { - while (true) { + int expectedQueueSize, ActionScheduler scheduler) { + int cycleCount = 0; + while (cycleCount++ <= MAX_CYCLE_ITERATIONS) { List<AgentCommand> ac = aq.dequeueAll(hostname); if (ac != null) { if (ac.size() == expectedQueueSize) { @@ -176,16 +176,19 @@ public class TestActionScheduler { + ac.size()); } } - Thread.sleep(100); + try { + scheduler.doWork(); + } catch (AmbariException e) { + Assert.fail("Ambari exception : " + e.getMessage() + e.getStackTrace()); + } } + return null; } /** * Test whether scheduler times out an action */ @Test - // BUG-15597 - @Ignore public void testActionTimeout() throws Exception { ActionQueue aq = new ActionQueue(); Properties properties = new Properties(); @@ -218,6 +221,10 @@ public class TestActionScheduler { ActionDBAccessor db = mock(ActionDBAccessor.class); when(db.getStagesInProgress()).thenReturn(stages); + Request request = mock(Request.class); + when(request.isExclusive()).thenReturn(false); + when(db.getRequest(anyLong())).thenReturn(request); + doAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocation) throws Throwable { @@ -231,15 +238,15 @@ public class TestActionScheduler { //Small action timeout to test rescheduling - ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, + ActionScheduler scheduler = new ActionScheduler(100, 0, db, aq, fsm, 3, new HostsMap((String) null), null, unitOfWork, conf); scheduler.setTaskTimeoutAdjustment(false); // Start the thread - scheduler.start(); + int cycleCount = 0; while (!stages.get(0).getHostRoleStatus(hostname, "NAMENODE") - .equals(HostRoleStatus.TIMEDOUT)) { - Thread.sleep(100); + .equals(HostRoleStatus.TIMEDOUT) && cycleCount++ <= MAX_CYCLE_ITERATIONS) { + scheduler.doWork(); } assertEquals(stages.get(0).getHostRoleStatus(hostname, "NAMENODE"), HostRoleStatus.TIMEDOUT); @@ -247,7 +254,6 @@ public class TestActionScheduler { verify(db, times(1)).startRequest(eq(1L)); verify(db, times(1)).abortOperation(1L); - scheduler.stop(); } @Test @@ -307,24 +313,23 @@ public class TestActionScheduler { new HostsMap((String) null), sam, unitOfWork, conf). addMockedMethod("cancelHostRoleCommands"). createMock(); + scheduler.cancelHostRoleCommands((Collection<HostRoleCommand>)EasyMock.anyObject(),EasyMock.anyObject(String.class)); + EasyMock.expectLastCall(); EasyMock.replay(scheduler); scheduler.setTaskTimeoutAdjustment(false); - // Start the thread - scheduler.start(); + int cycleCount=0; while (!stages.get(0).getHostRoleStatus(hostname, "NAMENODE") - .equals(HostRoleStatus.TIMEDOUT)) { - Thread.sleep(100L); + .equals(HostRoleStatus.TIMEDOUT) && cycleCount++ <= MAX_CYCLE_ITERATIONS) { + scheduler.doWork(); } - scheduler.stop(); + Assert.assertEquals(HostRoleStatus.TIMEDOUT,stages.get(0).getHostRoleStatus(hostname, "NAMENODE")); - //Check that cancelHostRoleCommands() was not called EasyMock.verify(scheduler); } @Test - @Ignore // This is temporary disabled as discussed here https://reviews.apache.org/r/26510/ public void testOpFailedEventRaisedForAbortedHostRole() throws Exception { ActionQueue aq = new ActionQueue(); Properties properties = new Properties(); @@ -362,7 +367,7 @@ public class TestActionScheduler { final List<Stage> stages = new ArrayList<Stage>(); Stage stage = new Stage(1, "/tmp", "cluster1", 1L, "stageWith2Tasks", - CLUSTER_HOST_INFO, "", ""); + CLUSTER_HOST_INFO, "{\"command_param\":\"param_value\"}", "{\"host_param\":\"param_value\"}"); addInstallTaskToStage(stage, hostname1, "cluster1", Role.DATANODE, RoleCommand.INSTALL, Service.Type.HDFS, 1); addInstallTaskToStage(stage, hostname2, "cluster1", Role.NAMENODE, @@ -422,14 +427,12 @@ public class TestActionScheduler { ActionScheduler scheduler = new ActionScheduler(100, 50000, db, aq, fsm, 3, new HostsMap((String) null), null, unitOfWork, conf); scheduler.setTaskTimeoutAdjustment(false); - // Start the thread - scheduler.start(); - - while (!stages.get(0).getHostRoleStatus(hostname1, "DATANODE") - .equals(HostRoleStatus.TIMEDOUT) && !stages.get(0).getHostRoleStatus - (hostname2, "NAMENODE").equals(HostRoleStatus.ABORTED)) { - Thread.sleep(100L); + int cycleCount=0; + while (!(stages.get(0).getHostRoleStatus(hostname1, "DATANODE") + .equals(HostRoleStatus.TIMEDOUT) && stages.get(0).getHostRoleStatus + (hostname2, "NAMENODE").equals(HostRoleStatus.ABORTED)) && cycleCount++ <= MAX_CYCLE_ITERATIONS) { + scheduler.doWork(); } Assert.assertEquals(HostRoleStatus.TIMEDOUT, @@ -466,7 +469,6 @@ public class TestActionScheduler { Assert.assertNotNull("Namenode should be in Install failed state.", namenodeFailedEvent); - scheduler.stop(); } /** @@ -524,14 +526,13 @@ public class TestActionScheduler { ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork, conf); - scheduler.start(); + int cycleCount=0; while (!stages.get(0).getHostRoleStatus(hostname, "AMBARI_SERVER_ACTION") - .equals(HostRoleStatus.COMPLETED)) { - Thread.sleep(100); + .equals(HostRoleStatus.COMPLETED) && cycleCount++ <= MAX_CYCLE_ITERATIONS) { + scheduler.doWork(); } - scheduler.stop(); assertEquals(stages.get(0).getHostRoleStatus(hostname, "AMBARI_SERVER_ACTION"), HostRoleStatus.COMPLETED); @@ -588,13 +589,12 @@ public class TestActionScheduler { ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork, conf); - scheduler.start(); + int cycleCount = 0; while (!stages.get(0).getHostRoleStatus(hostname, "AMBARI_SERVER_ACTION") - .equals(HostRoleStatus.FAILED)) { - Thread.sleep(100); + .equals(HostRoleStatus.FAILED) && cycleCount++ <= MAX_CYCLE_ITERATIONS) { + scheduler.doWork(); } - scheduler.stop(); assertEquals(stages.get(0).getHostRoleStatus(hostname, "AMBARI_SERVER_ACTION"), HostRoleStatus.FAILED); assertEquals("test", stages.get(0).getRequestContext()); @@ -1499,10 +1499,8 @@ public class TestActionScheduler { ActionScheduler scheduler = new ActionScheduler(100, 100, db, aq, fsm, 10000, new HostsMap((String) null), null, unitOfWork, conf); scheduler.setTaskTimeoutAdjustment(false); - // Start the thread - scheduler.start(); - List<AgentCommand> ac = waitForQueueSize(hostname, aq, 1); + List<AgentCommand> ac = waitForQueueSize(hostname, aq, 1, scheduler); assertTrue(ac.get(0) instanceof ExecutionCommand); assertEquals(String.valueOf(requestId1) + "-" + stageId, ((ExecutionCommand) (ac.get(0))).getCommandId()); @@ -1513,7 +1511,7 @@ public class TestActionScheduler { when(db.getStagesInProgress()).thenReturn(Collections.singletonList(s2)); //Verify that ActionSheduler does not return cached value of cluster host info for new requestId - ac = waitForQueueSize(hostname, aq, 1); + ac = waitForQueueSize(hostname, aq, 1, scheduler); assertTrue(ac.get(0) instanceof ExecutionCommand); assertEquals(String.valueOf(requestId2) + "-" + stageId, ((ExecutionCommand) (ac.get(0))).getCommandId()); assertEquals(clusterHostInfo2, ((ExecutionCommand) (ac.get(0))).getClusterHostInfo()); @@ -1675,14 +1673,13 @@ public class TestActionScheduler { ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork, conf); - scheduler.start(); + int cycleCount = 0; while (!stages.get(0).getHostRoleStatus(hostname, "AMBARI_SERVER_ACTION") - .equals(HostRoleStatus.COMPLETED)) { - Thread.sleep(100); + .equals(HostRoleStatus.COMPLETED) && cycleCount <= MAX_CYCLE_ITERATIONS) { + scheduler.doWork(); } - scheduler.stop(); assertEquals(stages.get(0).getHostRoleStatus(hostname, "AMBARI_SERVER_ACTION"), HostRoleStatus.COMPLETED); }
