Repository: tez Updated Branches: refs/heads/master 09102e517 -> 9058460f1
TEZ-3944. TestTaskScheduler times-out on Hadoop3 (Jonathan Eagles viak kshukla) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9058460f Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9058460f Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9058460f Branch: refs/heads/master Commit: 9058460f12043a96effb1bbc5fa019f9067b0c39 Parents: 09102e5 Author: Kuhu Shukla <[email protected]> Authored: Thu Jun 7 11:42:49 2018 -0500 Committer: Kuhu Shukla <[email protected]> Committed: Thu Jun 7 11:42:49 2018 -0500 ---------------------------------------------------------------------- .../tez/dag/app/rm/TestTaskScheduler.java | 405 +++++++------------ 1 file changed, 157 insertions(+), 248 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/9058460f/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java index 1a647b1..49f8fe3 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java @@ -155,12 +155,12 @@ public class TestTaskScheduler { Assert.assertEquals(scheduler.getClusterNodeCount(), mockRMClient.getClusterNodeCount()); - Object mockTask1 = mock(Object.class); - Object mockCookie1 = mock(Object.class); - Resource mockCapability = mock(Resource.class); + Object mockTask1 = new MockTask("task1"); + Object mockCookie1 = new Object(); + Resource mockCapability = Resource.newInstance(1024, 1); String[] hosts = {"host1", "host5"}; String[] racks = {"/default-rack", "/default-rack"}; - Priority mockPriority = mock(Priority.class); + Priority mockPriority = Priority.newInstance(1); ArgumentCaptor<CookieContainerRequest> requestCaptor = ArgumentCaptor.forClass(CookieContainerRequest.class); // allocate task @@ -187,10 +187,10 @@ public class TestTaskScheduler { releaseAssignedContainer((ContainerId) any()); // allocate tasks - Object mockTask2 = mock(Object.class); - Object mockCookie2 = mock(Object.class); - Object mockTask3 = mock(Object.class); - Object mockCookie3 = mock(Object.class); + Object mockTask2 = new MockTask("task2"); + Object mockCookie2 = new Object(); + Object mockTask3 = new MockTask("task3"); + Object mockCookie3 = new Object(); scheduler.allocateTask(mockTask1, mockCapability, hosts, racks, mockPriority, null, mockCookie1); drainableAppCallback.drain(); @@ -210,26 +210,23 @@ public class TestTaskScheduler { addContainerRequest(requestCaptor.capture()); CookieContainerRequest request3 = requestCaptor.getValue(); - List<Container> containers = new ArrayList<Container>(); - Container mockContainer1 = mock(Container.class, RETURNS_DEEP_STUBS); - when(mockContainer1.getNodeId().getHost()).thenReturn("host1"); - ContainerId mockCId1 = mock(ContainerId.class); - when(mockContainer1.getId()).thenReturn(mockCId1); + NodeId host1 = NodeId.newInstance("host1", 1); + NodeId host2 = NodeId.newInstance("host2", 2); + NodeId host3 = NodeId.newInstance("host3", 3); + NodeId host4 = NodeId.newInstance("host4", 4); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1); + ContainerId mockCId1 = ContainerId.newContainerId(attemptId, 1); + Container mockContainer1 = Container.newInstance(mockCId1, host1, null, mockCapability, mockPriority, null); + ContainerId mockCId2 = ContainerId.newContainerId(attemptId, 2); + Container mockContainer2 = Container.newInstance(mockCId2, host2, null, mockCapability, mockPriority, null); + ContainerId mockCId3 = ContainerId.newContainerId(attemptId, 3); + Container mockContainer3 = Container.newInstance(mockCId3, host3, null, mockCapability, mockPriority, null); + ContainerId mockCId4 = ContainerId.newContainerId(attemptId, 4); + Container mockContainer4 = Container.newInstance(mockCId4, host4, null, mockCapability, mockPriority, null); + List<Container> containers = new ArrayList<>(); containers.add(mockContainer1); - Container mockContainer2 = mock(Container.class, RETURNS_DEEP_STUBS); - when(mockContainer2.getNodeId().getHost()).thenReturn("host2"); - ContainerId mockCId2 = mock(ContainerId.class); - when(mockContainer2.getId()).thenReturn(mockCId2); containers.add(mockContainer2); - Container mockContainer3 = mock(Container.class, RETURNS_DEEP_STUBS); - when(mockContainer3.getNodeId().getHost()).thenReturn("host3"); - ContainerId mockCId3 = mock(ContainerId.class); - when(mockContainer3.getId()).thenReturn(mockCId3); containers.add(mockContainer3); - Container mockContainer4 = mock(Container.class, RETURNS_DEEP_STUBS); - when(mockContainer4.getNodeId().getHost()).thenReturn("host4"); - ContainerId mockCId4 = mock(ContainerId.class); - when(mockContainer4.getId()).thenReturn(mockCId4); containers.add(mockContainer4); scheduler.onContainersAllocated(containers); drainableAppCallback.drain(); @@ -284,21 +281,17 @@ public class TestTaskScheduler { // verify blacklisting verify(mockRMClient, times(0)).addNodeToBlacklist((NodeId)any()); String badHost = "host6"; - NodeId badNodeId = mock(NodeId.class); - when(badNodeId.getHost()).thenReturn(badHost); + NodeId badNodeId = NodeId.newInstance(badHost, 1); scheduler.blacklistNode(badNodeId); verify(mockRMClient, times(1)).addNodeToBlacklist(badNodeId); - Object mockTask4 = mock(Object.class); - Object mockCookie4 = mock(Object.class); + Object mockTask4 = new MockTask("task4"); + Object mockCookie4 = new Object(); scheduler.allocateTask(mockTask4, mockCapability, null, null, mockPriority, null, mockCookie4); drainableAppCallback.drain(); verify(mockRMClient, times(5)).addContainerRequest(requestCaptor.capture()); - Container mockContainer5 = mock(Container.class, RETURNS_DEEP_STUBS); - when(mockContainer5.getNodeId().getHost()).thenReturn(badHost); - when(mockContainer5.getNodeId()).thenReturn(badNodeId); - ContainerId mockCId5 = mock(ContainerId.class); - when(mockContainer5.getId()).thenReturn(mockCId5); + ContainerId mockCId5 = ContainerId.newContainerId(attemptId, 5); + Container mockContainer5 = Container.newInstance(mockCId5, badNodeId, null, mockCapability, mockPriority, null); containers.clear(); containers.add(mockContainer5); scheduler.onContainersAllocated(containers); @@ -310,10 +303,9 @@ public class TestTaskScheduler { verify(mockRMClient, times(4)).releaseAssignedContainer((ContainerId) any()); // verify request added back verify(mockRMClient, times(6)).addContainerRequest(requestCaptor.capture()); - Container mockContainer6 = mock(Container.class, RETURNS_DEEP_STUBS); - when(mockContainer6.getNodeId().getHost()).thenReturn("host7"); - ContainerId mockCId6 = mock(ContainerId.class); - when(mockContainer6.getId()).thenReturn(mockCId6); + ContainerId mockCId6 = ContainerId.newContainerId(attemptId, 6); + NodeId host7 = NodeId.newInstance("host7", 7); + Container mockContainer6 = Container.newInstance(mockCId6, host7, null, mockCapability, mockPriority, null); containers.clear(); containers.add(mockContainer6); scheduler.onContainersAllocated(containers); @@ -407,21 +399,19 @@ public class TestTaskScheduler { scheduler.start(); drainableAppCallback.drain(); - Object mockTask1 = mock(Object.class); - when(mockTask1.toString()).thenReturn("task1"); - Object mockCookie1 = mock(Object.class); - Resource mockCapability = mock(Resource.class); + Object mockTask1 = new MockTask("task1"); + Object mockCookie1 = new Object(); + Resource mockCapability = Resource.newInstance(1024, 1); String[] hosts = {"host1", "host5"}; String[] racks = {"/default-rack", "/default-rack"}; final Priority mockPriority1 = Priority.newInstance(1); final Priority mockPriority2 = Priority.newInstance(2); final Priority mockPriority3 = Priority.newInstance(3); - Object mockTask2 = mock(Object.class); - when(mockTask2.toString()).thenReturn("task2"); - Object mockCookie2 = mock(Object.class); - Object mockTask3 = mock(Object.class); - when(mockTask3.toString()).thenReturn("task3"); - Object mockCookie3 = mock(Object.class); + Priority mockPriority = Priority.newInstance(1); + Object mockTask2 = new MockTask("task2"); + Object mockCookie2 = new Object(); + Object mockTask3 = new MockTask("task3"); + Object mockCookie3 = new Object(); ArgumentCaptor<CookieContainerRequest> requestCaptor = ArgumentCaptor.forClass(CookieContainerRequest.class); @@ -446,21 +436,14 @@ public class TestTaskScheduler { List<Container> containers = new ArrayList<Container>(); // sending lower priority container first to make sure its not matched - Container mockContainer1 = mock(Container.class, RETURNS_DEEP_STUBS); - when(mockContainer1.getNodeId().getHost()).thenReturn("host1"); - when(mockContainer1.getPriority()).thenReturn(mockPriority1); - when(mockContainer1.toString()).thenReturn("container1"); - ContainerId mockCId1 = mock(ContainerId.class); - when(mockContainer1.getId()).thenReturn(mockCId1); - when(mockCId1.toString()).thenReturn("container1"); + NodeId host1 = NodeId.newInstance("host1", 1); + NodeId host2 = NodeId.newInstance("host2", 2); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1); + ContainerId mockCId1 = ContainerId.newContainerId(attemptId, 1); + Container mockContainer1 = Container.newInstance(mockCId1, host1, null, mockCapability, mockPriority, null); + ContainerId mockCId2 = ContainerId.newContainerId(attemptId, 2); + Container mockContainer2 = Container.newInstance(mockCId2, host2, null, mockCapability, mockPriority, null); containers.add(mockContainer1); - Container mockContainer2 = mock(Container.class, RETURNS_DEEP_STUBS); - when(mockContainer2.getNodeId().getHost()).thenReturn("host2"); - when(mockContainer2.getPriority()).thenReturn(mockPriority2); - when(mockContainer2.toString()).thenReturn("container2"); - ContainerId mockCId2 = mock(ContainerId.class); - when(mockContainer2.getId()).thenReturn(mockCId2); - when(mockCId2.toString()).thenReturn("container2"); containers.add(mockContainer2); ArrayList<CookieContainerRequest> hostContainers = @@ -521,10 +504,9 @@ public class TestTaskScheduler { scheduler.start(); drainableAppCallback.drain(); - Object mockTask1 = mock(Object.class); - when(mockTask1.toString()).thenReturn("task1"); - Object mockCookie1 = mock(Object.class); - Resource mockCapability = mock(Resource.class); + Object mockTask1 = new MockTask("task1"); + Object mockCookie1 = new Object(); + Resource mockCapability = Resource.newInstance(1024, 1); String[] hosts = {"host1", "host5"}; String[] racks = {"/default-rack", "/default-rack"}; final Priority mockPriority1 = Priority.newInstance(1); @@ -532,12 +514,10 @@ public class TestTaskScheduler { final Priority mockPriority3 = Priority.newInstance(3); final Priority mockPriority4 = Priority.newInstance(4); final Priority mockPriority5 = Priority.newInstance(5); - Object mockTask2 = mock(Object.class); - when(mockTask2.toString()).thenReturn("task2"); - Object mockCookie2 = mock(Object.class); - Object mockTask3 = mock(Object.class); - when(mockTask3.toString()).thenReturn("task3"); - Object mockCookie3 = mock(Object.class); + Object mockTask2 = new MockTask("task2"); + Object mockCookie2 = new Object(); + Object mockTask3 = new MockTask("task3"); + Object mockCookie3 = new Object(); ArgumentCaptor<CookieContainerRequest> requestCaptor = ArgumentCaptor.forClass(CookieContainerRequest.class); @@ -560,39 +540,24 @@ public class TestTaskScheduler { addContainerRequest(requestCaptor.capture()); CookieContainerRequest request3 = requestCaptor.getValue(); - List<Container> containers = new ArrayList<Container>(); + NodeId host1 = NodeId.newInstance("host1", 1); + NodeId host2 = NodeId.newInstance("host2", 2); + NodeId host3 = NodeId.newInstance("host3", 3); + NodeId host4 = NodeId.newInstance("host4", 4); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1); + ContainerId mockCId1 = ContainerId.newContainerId(attemptId, 1); + Container mockContainer1 = Container.newInstance(mockCId1, host1, null, mockCapability, mockPriority1, null); + ContainerId mockCId2 = ContainerId.newContainerId(attemptId, 2); + Container mockContainer2 = Container.newInstance(mockCId2, host2, null, mockCapability, mockPriority2, null); + ContainerId mockCId3 = ContainerId.newContainerId(attemptId, 3); + Container mockContainer3 = Container.newInstance(mockCId3, host3, null, mockCapability, mockPriority3, null); + ContainerId mockCId4 = ContainerId.newContainerId(attemptId, 4); + Container mockContainer4 = Container.newInstance(mockCId4, host4, null, mockCapability, mockPriority4, null); // sending lower priority container first to make sure its not matched - Container mockContainer4 = mock(Container.class, RETURNS_DEEP_STUBS); - when(mockContainer4.getNodeId().getHost()).thenReturn("host4"); - when(mockContainer4.toString()).thenReturn("container4"); - when(mockContainer4.getPriority()).thenReturn(mockPriority4); - ContainerId mockCId4 = mock(ContainerId.class); - when(mockContainer4.getId()).thenReturn(mockCId4); - when(mockCId4.toString()).thenReturn("container4"); + List<Container> containers = new ArrayList<Container>(); containers.add(mockContainer4); - Container mockContainer1 = mock(Container.class, RETURNS_DEEP_STUBS); - when(mockContainer1.getNodeId().getHost()).thenReturn("host1"); - when(mockContainer1.getPriority()).thenReturn(mockPriority1); - when(mockContainer1.toString()).thenReturn("container1"); - ContainerId mockCId1 = mock(ContainerId.class); - when(mockContainer1.getId()).thenReturn(mockCId1); - when(mockCId1.toString()).thenReturn("container1"); containers.add(mockContainer1); - Container mockContainer2 = mock(Container.class, RETURNS_DEEP_STUBS); - when(mockContainer2.getNodeId().getHost()).thenReturn("host2"); - when(mockContainer2.getPriority()).thenReturn(mockPriority2); - when(mockContainer2.toString()).thenReturn("container2"); - ContainerId mockCId2 = mock(ContainerId.class); - when(mockContainer2.getId()).thenReturn(mockCId2); - when(mockCId2.toString()).thenReturn("container2"); containers.add(mockContainer2); - Container mockContainer3 = mock(Container.class, RETURNS_DEEP_STUBS); - when(mockContainer3.getNodeId().getHost()).thenReturn("host3"); - when(mockContainer3.getPriority()).thenReturn(mockPriority3); - when(mockContainer3.toString()).thenReturn("container3"); - ContainerId mockCId3 = mock(ContainerId.class); - when(mockContainer3.getId()).thenReturn(mockCId3); - when(mockCId3.toString()).thenReturn("container3"); containers.add(mockContainer3); AtomicBoolean drainNotifier = new AtomicBoolean(false); @@ -652,25 +617,17 @@ public class TestTaskScheduler { // verify blacklisting verify(mockRMClient, times(0)).addNodeToBlacklist((NodeId)any()); String badHost = "host6"; - NodeId badNodeId = mock(NodeId.class); - when(badNodeId.getHost()).thenReturn(badHost); + NodeId badNodeId = NodeId.newInstance(badHost, 1); scheduler.blacklistNode(badNodeId); verify(mockRMClient, times(1)).addNodeToBlacklist(badNodeId); - Object mockTask4 = mock(Object.class); - when(mockTask4.toString()).thenReturn("task4"); - Object mockCookie4 = mock(Object.class); + Object mockTask4 = new MockTask("task4"); + Object mockCookie4 = new Object(); scheduler.allocateTask(mockTask4, mockCapability, null, null, mockPriority4, null, mockCookie4); drainableAppCallback.drain(); verify(mockRMClient, times(4)).addContainerRequest(requestCaptor.capture()); - Container mockContainer5 = mock(Container.class, RETURNS_DEEP_STUBS); - when(mockContainer5.getNodeId().getHost()).thenReturn(badHost); - when(mockContainer5.getNodeId()).thenReturn(badNodeId); - ContainerId mockCId5 = mock(ContainerId.class); - when(mockContainer5.toString()).thenReturn("container5"); - when(mockCId5.toString()).thenReturn("container5"); - when(mockContainer5.getId()).thenReturn(mockCId5); - when(mockContainer5.getPriority()).thenReturn(mockPriority4); + ContainerId mockCId5 = ContainerId.newContainerId(attemptId, 5); + Container mockContainer5 = Container.newInstance(mockCId5, badNodeId, null, mockCapability, mockPriority4, null); containers.clear(); containers.add(mockContainer5); drainNotifier.set(false); @@ -684,12 +641,9 @@ public class TestTaskScheduler { verify(mockRMClient, times(4)).releaseAssignedContainer((ContainerId) any()); // verify request added back verify(mockRMClient, times(5)).addContainerRequest(requestCaptor.capture()); - Container mockContainer6 = mock(Container.class, RETURNS_DEEP_STUBS); - when(mockContainer6.getNodeId().getHost()).thenReturn("host7"); - ContainerId mockCId6 = mock(ContainerId.class); - when(mockContainer6.getId()).thenReturn(mockCId6); - when(mockContainer6.toString()).thenReturn("container6"); - when(mockCId6.toString()).thenReturn("container6"); + NodeId host7 = NodeId.newInstance("host7", 7); + ContainerId mockCId6 = ContainerId.newContainerId(attemptId, 6); + Container mockContainer6 = Container.newInstance(mockCId6, host7, null, mockCapability, mockPriority4, null); containers.clear(); containers.add(mockContainer6); drainNotifier.set(false); @@ -712,9 +666,8 @@ public class TestTaskScheduler { // verify container level matching // add a dummy task to prevent release of allocated containers - Object mockTask5 = mock(Object.class); - when(mockTask5.toString()).thenReturn("task5"); - Object mockCookie5 = mock(Object.class); + Object mockTask5 = new MockTask("task5"); + Object mockCookie5 = new Object(); scheduler.allocateTask(mockTask5, mockCapability, hosts, racks, mockPriority5, null, mockCookie5); verify(mockRMClient, times(6)).addContainerRequest(requestCaptor.capture()); @@ -722,29 +675,19 @@ public class TestTaskScheduler { // add containers so that we can reference one of them for container specific // allocation containers.clear(); - Container mockContainer7 = mock(Container.class, RETURNS_DEEP_STUBS); - when(mockContainer7.getNodeId().getHost()).thenReturn("host5"); - ContainerId mockCId7 = mock(ContainerId.class); - when(mockContainer7.toString()).thenReturn("container7"); - when(mockCId7.toString()).thenReturn("container7"); - when(mockContainer7.getId()).thenReturn(mockCId7); - when(mockContainer7.getPriority()).thenReturn(mockPriority5); + NodeId host5 = NodeId.newInstance("host5", 5); + ContainerId mockCId7 = ContainerId.newContainerId(attemptId, 7); + Container mockContainer7 = Container.newInstance(mockCId7, host5, null, mockCapability, mockPriority5, null); containers.add(mockContainer7); - Container mockContainer8 = mock(Container.class, RETURNS_DEEP_STUBS); - when(mockContainer8.getNodeId().getHost()).thenReturn("host5"); - ContainerId mockCId8 = mock(ContainerId.class); - when(mockContainer8.toString()).thenReturn("container8"); - when(mockCId8.toString()).thenReturn("container8"); - when(mockContainer8.getId()).thenReturn(mockCId8); - when(mockContainer8.getPriority()).thenReturn(mockPriority5); + ContainerId mockCId8 = ContainerId.newContainerId(attemptId, 8); + Container mockContainer8 = Container.newInstance(mockCId8, host5, null, mockCapability, mockPriority5, null); containers.add(mockContainer8); drainNotifier.set(false); scheduler.onContainersAllocated(containers); drainableAppCallback.drain(); verify(mockRMClient, times(5)).releaseAssignedContainer((ContainerId) any()); - Object mockTask6 = mock(Object.class); - when(mockTask6.toString()).thenReturn("task6"); - Object mockCookie6 = mock(Object.class); + Object mockTask6 = new MockTask("task6"); + Object mockCookie6 = new Object(); // allocate request with container affinity scheduler.allocateTask(mockTask6, mockCapability, mockCId7, mockPriority5, null, mockCookie6); drainableAppCallback.drain(); @@ -823,69 +766,49 @@ public class TestTaskScheduler { String node1Rack3 = "n1r3"; ApplicationAttemptId appId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0); + NodeId emptyHost = NodeId.newInstance("", 1); Resource r = Resource.newInstance(0, 0); ContainerId mockCId1 = ContainerId.newInstance(appId, 0); - Container c1 = mock(Container.class, RETURNS_DEEP_STUBS); - when(c1.getNodeId().getHost()).thenReturn(""); // we are mocking directly + Container c1 = Container.newInstance(mockCId1, emptyHost, null, r, null, null); HeldContainer hc1 = Mockito.spy(new HeldContainer(c1, 0, 0, null, containerSignatureMatcher)); when(hc1.getNode()).thenReturn(node1Rack1); when(hc1.getRack()).thenReturn(rack1); - when(c1.getId()).thenReturn(mockCId1); - when(c1.getResource()).thenReturn(r); when(hc1.getContainer()).thenReturn(c1); ContainerId mockCId2 = ContainerId.newInstance(appId, 1); - Container c2 = mock(Container.class, RETURNS_DEEP_STUBS); - when(c2.getNodeId().getHost()).thenReturn(""); // we are mocking directly + Container c2 = Container.newInstance(mockCId2, emptyHost, null, r, null, null); HeldContainer hc2 = Mockito.spy(new HeldContainer(c2, 0, 0, null, containerSignatureMatcher)); when(hc2.getNode()).thenReturn(node2Rack1); when(hc2.getRack()).thenReturn(rack1); - when(c2.getId()).thenReturn(mockCId2); - when(c2.getResource()).thenReturn(r); when(hc2.getContainer()).thenReturn(c2); ContainerId mockCId3 = ContainerId.newInstance(appId, 2); - Container c3 = mock(Container.class, RETURNS_DEEP_STUBS); - when(c3.getNodeId().getHost()).thenReturn(""); // we are mocking directly + Container c3 = Container.newInstance(mockCId3, emptyHost, null, r, null, null); HeldContainer hc3 = Mockito.spy(new HeldContainer(c3, 0, 0, null, containerSignatureMatcher)); when(hc3.getNode()).thenReturn(node1Rack1); when(hc3.getRack()).thenReturn(rack1); - when(c3.getId()).thenReturn(mockCId3); - when(c3.getResource()).thenReturn(r); when(hc3.getContainer()).thenReturn(c3); ContainerId mockCId4 = ContainerId.newInstance(appId, 3); - Container c4 = mock(Container.class, RETURNS_DEEP_STUBS); - when(c4.getNodeId().getHost()).thenReturn(""); // we are mocking directly + Container c4 = Container.newInstance(mockCId4, emptyHost, null, r, null, null); HeldContainer hc4 = Mockito.spy(new HeldContainer(c4, 0, 0, null, containerSignatureMatcher)); when(hc4.getNode()).thenReturn(node2Rack1); when(hc4.getRack()).thenReturn(rack1); - when(c4.getId()).thenReturn(mockCId4); - when(c4.getResource()).thenReturn(r); when(hc4.getContainer()).thenReturn(c4); ContainerId mockCId5 = ContainerId.newInstance(appId, 4); - Container c5 = mock(Container.class, RETURNS_DEEP_STUBS); - when(c5.getNodeId().getHost()).thenReturn(""); // we are mocking directly + Container c5 = Container.newInstance(mockCId5, emptyHost, null, r, null, null); HeldContainer hc5 = Mockito.spy(new HeldContainer(c5, 0, 0, null, containerSignatureMatcher)); when(hc5.getNode()).thenReturn(node1Rack2); when(hc5.getRack()).thenReturn(rack2); - when(c5.getId()).thenReturn(mockCId5); - when(c5.getResource()).thenReturn(r); when(hc5.getContainer()).thenReturn(c5); ContainerId mockCId6 = ContainerId.newInstance(appId, 5); - Container c6 = mock(Container.class, RETURNS_DEEP_STUBS); - when(c6.getNodeId().getHost()).thenReturn(""); // we are mocking directly + Container c6 = Container.newInstance(mockCId6, emptyHost, null, r, null, null); HeldContainer hc6 = Mockito.spy(new HeldContainer(c6, 0, 0, null, containerSignatureMatcher)); when(hc6.getNode()).thenReturn(node2Rack2); when(hc6.getRack()).thenReturn(rack2); - when(c6.getId()).thenReturn(mockCId6); - when(c6.getResource()).thenReturn(r); when(hc6.getContainer()).thenReturn(c6); ContainerId mockCId7 = ContainerId.newInstance(appId, 6); - Container c7 = mock(Container.class, RETURNS_DEEP_STUBS); - when(c7.getNodeId().getHost()).thenReturn(""); // we are mocking directly + Container c7 = Container.newInstance(mockCId7, emptyHost, null, r, null, null); HeldContainer hc7 = Mockito.spy(new HeldContainer(c7, 0, 0, null, containerSignatureMatcher)); when(hc7.getNode()).thenReturn(node1Rack3); when(hc7.getRack()).thenReturn(rack3); - when(c7.getId()).thenReturn(mockCId7); - when(c7.getResource()).thenReturn(r); when(hc7.getContainer()).thenReturn(c7); scheduler.heldContainers.put(mockCId1, hc1); @@ -965,15 +888,13 @@ public class TestTaskScheduler { scheduler.initialize(); scheduler.start(); + Resource mockCapability = Resource.newInstance(1024, 1); + NodeId emptyHost = NodeId.newInstance("", 1); ApplicationAttemptId appId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0); ContainerId containerId = ContainerId.newInstance(appId, 0); - Container c1 = mock(Container.class, RETURNS_DEEP_STUBS); - when(c1.getNodeId().getHost()).thenReturn(""); // we are mocking directly - - HeldContainer hc1 = mock(HeldContainer.class); - when(c1.getId()).thenReturn(containerId); - when(hc1.getContainer()).thenReturn(c1); - when(hc1.isNew()).thenReturn(false); + Container c1 = Container.newInstance(containerId, emptyHost, null, mockCapability, null, null); + + HeldContainer hc1 = new HeldContainer(c1, -1, -1, null, containerSignatureMatcher); // containerExpiryTime = 0 scheduler.heldContainers.put(containerId, hc1); @@ -1084,14 +1005,14 @@ public class TestTaskScheduler { verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any()); // allocate task - Object mockTask1 = mock(Object.class); - Object mockTask2 = mock(Object.class); - Object mockTask3 = mock(Object.class); - Object mockTask3Wait = mock(Object.class); - Object mockTask3Retry = mock(Object.class); - Object mockTask3KillA = mock(Object.class); - Object mockTask3KillB = mock(Object.class); - Object mockTaskPri8 = mock(Object.class); + Object mockTask1 = new MockTask("task1"); + Object mockTask2 = new MockTask("task2"); + Object mockTask3 = new MockTask("task3"); + Object mockTask3Wait = new MockTask("task3Wait"); + Object mockTask3Retry = new MockTask("task3Retry"); + Object mockTask3KillA = new MockTask("task3KillA"); + Object mockTask3KillB = new MockTask("task3KillB"); + Object mockTaskPri8 = new MockTask("taskPri8"); Object obj3 = new Object(); Priority pri2 = Priority.newInstance(2); Priority pri4 = Priority.newInstance(4); @@ -1146,35 +1067,21 @@ public class TestTaskScheduler { new LinkedList<ArrayList<CookieContainerRequest>>(); anyList.add(anyContainers); + NodeId host1 = NodeId.newInstance("host1", 1); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1); + ContainerId mockCId1 = ContainerId.newContainerId(attemptId, 1); + Container mockContainer1 = Container.newInstance(mockCId1, host1, null, taskAsk, pri2, null); + ContainerId mockCId2 = ContainerId.newContainerId(attemptId, 2); + Container mockContainer2 = Container.newInstance(mockCId2, host1, null, taskAsk, pri6, null); + ContainerId mockCId3 = ContainerId.newContainerId(attemptId, 3); + Container mockContainer3 = Container.newInstance(mockCId3, host1, null, taskAsk, pri6, null); + ContainerId mockCId4 = ContainerId.newContainerId(attemptId, 4); + Container mockContainer4 = Container.newInstance(mockCId4, host1, null, taskAsk, pri2, null); List<Container> containers = new ArrayList<Container>(); - Container mockContainer1 = mock(Container.class, RETURNS_DEEP_STUBS); - when(mockContainer1.getNodeId().getHost()).thenReturn("host1"); - when(mockContainer1.getResource()).thenReturn(taskAsk); - when(mockContainer1.getPriority()).thenReturn(pri2); - ContainerId mockCId1 = mock(ContainerId.class); - when(mockContainer1.getId()).thenReturn(mockCId1); containers.add(mockContainer1); - Container mockContainer2 = mock(Container.class, RETURNS_DEEP_STUBS); - when(mockContainer2.getNodeId().getHost()).thenReturn("host1"); - when(mockContainer2.getResource()).thenReturn(taskAsk); - when(mockContainer2.getPriority()).thenReturn(pri6); - ContainerId mockCId2 = mock(ContainerId.class); - when(mockContainer2.getId()).thenReturn(mockCId2); containers.add(mockContainer2); - Container mockContainer3A = mock(Container.class, RETURNS_DEEP_STUBS); - when(mockContainer3A.getNodeId().getHost()).thenReturn("host1"); - when(mockContainer3A.getResource()).thenReturn(taskAsk); - when(mockContainer3A.getPriority()).thenReturn(pri6); - ContainerId mockCId3A = mock(ContainerId.class); - when(mockContainer3A.getId()).thenReturn(mockCId3A); - containers.add(mockContainer3A); - Container mockContainer3B = mock(Container.class, RETURNS_DEEP_STUBS); - when(mockContainer3B.getNodeId().getHost()).thenReturn("host1"); - when(mockContainer3B.getResource()).thenReturn(taskAsk); - when(mockContainer3B.getPriority()).thenReturn(pri2); // high priority container - ContainerId mockCId3B = mock(ContainerId.class); - when(mockContainer3B.getId()).thenReturn(mockCId3B); - containers.add(mockContainer3B); + containers.add(mockContainer3); + containers.add(mockContainer4); when( mockRMClient.getMatchingRequests((Priority) any(), eq("host1"), (Resource) any())).thenAnswer( @@ -1232,11 +1139,11 @@ public class TestTaskScheduler { scheduler.taskAllocations.get(mockTask1).getId()); Assert.assertEquals(mockCId2, scheduler.taskAllocations.get(mockTask3).getId()); - Assert.assertEquals(mockCId3A, + Assert.assertEquals(mockCId3, scheduler.taskAllocations.get(mockTask3KillA).getId()); // high priority container assigned to lower pri task. This task should still be preempted // because the task priority is relevant for preemption and not the container priority - Assert.assertEquals(mockCId3B, + Assert.assertEquals(mockCId4, scheduler.taskAllocations.get(mockTask3KillB).getId()); // no preemption @@ -1259,19 +1166,15 @@ public class TestTaskScheduler { drainableAppCallback.drain(); verify(mockRMClient, times(6)).addContainerRequest(requestCaptor.capture()); verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any()); - - Container mockContainer4 = mock(Container.class, RETURNS_DEEP_STUBS); - when(mockContainer4.getNodeId().getHost()).thenReturn("host1"); - when(mockContainer4.getResource()).thenReturn(taskAsk); - when(mockContainer4.getPriority()).thenReturn(pri8); - ContainerId mockCId4 = mock(ContainerId.class); - when(mockContainer4.getId()).thenReturn(mockCId4); + + ContainerId mockCId5 = ContainerId.newContainerId(attemptId, 5); + Container mockContainer5 = Container.newInstance(mockCId5, host1, null, taskAsk, pri8, null); containers.clear(); - containers.add(mockContainer4); + containers.add(mockContainer5); // new lower pri container added that wont be matched and eventually preempted // Fudge new container being present in delayed allocation list due to race - HeldContainer heldContainer = new HeldContainer(mockContainer4, -1, -1, null, + HeldContainer heldContainer = new HeldContainer(mockContainer5, -1, -1, null, containerSignatureMatcher); scheduler.delayedContainerManager.delayedContainers.add(heldContainer); // no preemption - container assignment attempts < 3 @@ -1292,7 +1195,7 @@ public class TestTaskScheduler { scheduler.getProgress(); drainableAppCallback.drain(); verify(mockRMClient, times(1)).releaseAssignedContainer((ContainerId)any()); - verify(mockRMClient, times(1)).releaseAssignedContainer(mockCId4); + verify(mockRMClient, times(1)).releaseAssignedContainer(mockCId5); // internally re-request pri8 task request because we release pri8 new container verify(mockRMClient, times(7)).addContainerRequest(requestCaptor.capture()); CookieContainerRequest reAdded = requestCaptor.getValue(); @@ -1335,7 +1238,7 @@ public class TestTaskScheduler { scheduler.getProgress(); // third heartbeat drainableAppCallback.drain(); verify(mockRMClient, times(2)).releaseAssignedContainer((ContainerId)any()); - verify(mockRMClient, times(1)).releaseAssignedContainer(mockCId3B); + verify(mockRMClient, times(1)).releaseAssignedContainer(mockCId4); Assert.assertEquals(scheduler.numHeartbeats, scheduler.heartbeatAtLastPreemption); // there are pending preemptions. scheduler.getProgress(); // first heartbeat @@ -1345,7 +1248,7 @@ public class TestTaskScheduler { drainableAppCallback.drain(); // Next oldest mockTaskPri3KillA gets preempted to clear 10% of outstanding running preemptable tasks verify(mockRMClient, times(3)).releaseAssignedContainer((ContainerId)any()); - verify(mockRMClient, times(1)).releaseAssignedContainer(mockCId3A); + verify(mockRMClient, times(1)).releaseAssignedContainer(mockCId3); AppFinalStatus finalStatus = new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, "", DEFAULT_APP_URL); @@ -1384,9 +1287,9 @@ public class TestTaskScheduler { verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any()); // allocate task - Object mockTask1 = mock(Object.class); - Object mockTask2 = mock(Object.class); - Object mockTask3 = mock(Object.class); + Object mockTask1 = new MockTask("task1"); + Object mockTask2 = new MockTask("task2"); + Object mockTask3 = new MockTask("task3"); Object obj3 = new Object(); Priority pri2 = Priority.newInstance(2); Priority pri4 = Priority.newInstance(4); @@ -1411,13 +1314,11 @@ public class TestTaskScheduler { Assert.assertEquals(totalResource, scheduler.getTotalResources()); verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any()); + NodeId host1 = NodeId.newInstance("host1", 1); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1); + ContainerId mockCId1 = ContainerId.newContainerId(attemptId, 1); + Container mockContainer1 = Container.newInstance(mockCId1, host1, null, taskAsk, pri4, null); List<Container> containers = new ArrayList<Container>(); - Container mockContainer1 = mock(Container.class, RETURNS_DEEP_STUBS); - when(mockContainer1.getNodeId().getHost()).thenReturn("host1"); - when(mockContainer1.getResource()).thenReturn(taskAsk); - when(mockContainer1.getPriority()).thenReturn(pri4); - ContainerId mockCId1 = mock(ContainerId.class); - when(mockContainer1.getId()).thenReturn(mockCId1); containers.add(mockContainer1); Mockito.doAnswer(new Answer<Object>() { @@ -1536,12 +1437,12 @@ public class TestTaskScheduler { String defaultRack[] = { "/default-rack" }; String otherRack[] = { "/other-rack" }; - Object mockTask1 = mock(Object.class); + Object mockTask1 = new MockTask("task1"); CookieContainerRequest mockCookie1 = mock(CookieContainerRequest.class, RETURNS_DEEP_STUBS); when(mockCookie1.getCookie().getTask()).thenReturn(mockTask1); - Object mockTask2 = mock(Object.class); + Object mockTask2 = new MockTask("task2"); CookieContainerRequest mockCookie2 = mock(CookieContainerRequest.class, RETURNS_DEEP_STUBS); when(mockCookie2.getCookie().getTask()).thenReturn(mockTask2); @@ -1640,17 +1541,15 @@ public class TestTaskScheduler { scheduler.start(); drainableAppCallback.drain(); - Object mockTask1 = mock(Object.class); - when(mockTask1.toString()).thenReturn("task1"); - Object mockCookie1 = mock(Object.class); - Resource mockCapability = mock(Resource.class); + Object mockTask1 = new MockTask("task1"); + Object mockCookie1 = new Object(); + Resource mockCapability = Resource.newInstance(1024, 1); String[] hosts = {"host1", "host5"}; String[] racks = {"/default-rack", "/default-rack"}; final Priority mockPriority1 = Priority.newInstance(1); final Priority mockPriority2 = Priority.newInstance(2); - Object mockTask2 = mock(Object.class); - when(mockTask2.toString()).thenReturn("task2"); - Object mockCookie2 = mock(Object.class); + Object mockTask2 = new MockTask("task2"); + Object mockCookie2 = new Object(); ArgumentCaptor<CookieContainerRequest> requestCaptor = ArgumentCaptor.forClass(CookieContainerRequest.class); @@ -1669,13 +1568,10 @@ public class TestTaskScheduler { List<Container> containers = new ArrayList<Container>(); // sending only lower priority container to make sure its not matched - Container mockContainer2 = mock(Container.class, RETURNS_DEEP_STUBS); - when(mockContainer2.getNodeId().getHost()).thenReturn("host2"); - when(mockContainer2.getPriority()).thenReturn(mockPriority2); - when(mockContainer2.toString()).thenReturn("container2"); - ContainerId mockCId2 = mock(ContainerId.class); - when(mockContainer2.getId()).thenReturn(mockCId2); - when(mockCId2.toString()).thenReturn("container2"); + NodeId host2 = NodeId.newInstance("host2", 2); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1); + ContainerId mockCId2 = ContainerId.newContainerId(attemptId, 2); + Container mockContainer2 = Container.newInstance(mockCId2, host2, null, mockCapability, mockPriority2, null); containers.add(mockContainer2); scheduler.onContainersAllocated(containers); @@ -1713,4 +1609,17 @@ public class TestTaskScheduler { + ":0", resource, priority, null); return container; } + + static class MockTask { + final String name; + + MockTask(String name) { + this.name = name; + } + + @Override + public String toString() { + return name; + } + } }
