TEZ-2713. Add tests for node handling when there's multiple schedulers. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/919b3ed3 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/919b3ed3 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/919b3ed3 Branch: refs/heads/TEZ-2003 Commit: 919b3ed316b9f4c93aecd5a30383a4b3971dbfc6 Parents: 815c7cd Author: Siddharth Seth <[email protected]> Authored: Wed Aug 12 10:24:33 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Fri Aug 14 13:49:07 2015 -0700 ---------------------------------------------------------------------- .../tez/dag/app/rm/node/AMNodeTracker.java | 8 + .../tez/dag/app/rm/node/TestAMNodeTracker.java | 275 +++++++++++++++---- 2 files changed, 231 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/919b3ed3/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java index 751276e..1aa8472 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java @@ -116,6 +116,14 @@ public class AMNodeTracker extends AbstractService implements return perSourceNodeTrackers.get(schedulerId).get(nodeId); } + /** + * Retrieve the number of nodes from this source on which containers may be running + * + * This number may differ from the total number of nodes available from the source + * + * @param schedulerId the schedulerId for which the node count is required + * @return the number of nodes from the scheduler on which containers have been allocated + */ public int getNumNodes(int schedulerId) { return perSourceNodeTrackers.get(schedulerId).getNumNodes(); } http://git-wip-us.apache.org/repos/asf/tez/blob/919b3ed3/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java index 84d2e1f..def80da 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java @@ -19,6 +19,9 @@ package org.apache.tez.dag.app.rm.node; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.doReturn; @@ -123,6 +126,55 @@ public class TestAMNodeTracker { } @Test (timeout = 5000) + public void testMultipleSourcesNodeRegistration() { + AppContext appContext = mock(AppContext.class); + AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext); + doReturn(amNodeTracker).when(appContext).getNodeTracker(); + + amNodeTracker.init(new Configuration(false)); + amNodeTracker.start(); + + NodeId nodeId1 = NodeId.newInstance("source01", 3333); + NodeId nodeId2 = NodeId.newInstance("source02", 3333); + + amNodeTracker.nodeSeen(nodeId1, 0); + amNodeTracker.nodeSeen(nodeId2, 1); + + assertEquals(1, amNodeTracker.getNumNodes(0)); + assertEquals(1, amNodeTracker.getNumNodes(1)); + assertNotNull(amNodeTracker.get(nodeId1, 0)); + assertNull(amNodeTracker.get(nodeId2, 0)); + assertNull(amNodeTracker.get(nodeId1, 1)); + assertNotNull(amNodeTracker.get(nodeId2, 1)); + } + + @Test (timeout = 5000) + public void testMultipleSourcesNodeCountUpdated() { + AppContext appContext = mock(AppContext.class); + AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext); + doReturn(amNodeTracker).when(appContext).getNodeTracker(); + + amNodeTracker.init(new Configuration(false)); + amNodeTracker.start(); + + NodeId nodeId1 = NodeId.newInstance("source01", 3333); + NodeId nodeId2 = NodeId.newInstance("source02", 3333); + + amNodeTracker.nodeSeen(nodeId1, 0); + amNodeTracker.nodeSeen(nodeId2, 1); + amNodeTracker.handle(new AMNodeEventNodeCountUpdated(10, 0)); + amNodeTracker.handle(new AMNodeEventNodeCountUpdated(20, 1)); + + // NodeCountUpdate does not reflect in getNumNodes. + assertEquals(1, amNodeTracker.getNumNodes(0)); + assertEquals(1, amNodeTracker.getNumNodes(1)); + assertNotNull(amNodeTracker.get(nodeId1, 0)); + assertNull(amNodeTracker.get(nodeId2, 0)); + assertNull(amNodeTracker.get(nodeId1, 1)); + assertNotNull(amNodeTracker.get(nodeId2, 1)); + } + + @Test (timeout = 5000) public void testSingleNodeNotBlacklisted() { AppContext appContext = mock(AppContext.class); Configuration conf = new Configuration(false); @@ -142,32 +194,61 @@ public class TestAMNodeTracker { amNodeTracker.init(conf); amNodeTracker.start(); - amNodeTracker.handle(new AMNodeEventNodeCountUpdated(1, 0)); - NodeId nodeId = NodeId.newInstance("host1", 1234); - amNodeTracker.nodeSeen(nodeId, 0); + _testSingleNodeNotBlacklisted(amNodeTracker, handler, 0); + } - AMNodeImpl node = (AMNodeImpl) amNodeTracker.get(nodeId, 0); + @Test (timeout = 5000) + public void testSingleNodeNotBlacklistedAlternateScheduler() { + AppContext appContext = mock(AppContext.class); + Configuration conf = new Configuration(false); + conf.setInt(TezConfiguration.TEZ_AM_MAX_TASK_FAILURES_PER_NODE, 2); + conf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, true); + conf.setInt(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD, 33); - ContainerId cId1 = mock(ContainerId.class); - ContainerId cId2 = mock(ContainerId.class); + TestEventHandler handler = new TestEventHandler(); + AMNodeTracker amNodeTracker = new AMNodeTracker(handler, appContext); + doReturn(amNodeTracker).when(appContext).getNodeTracker(); + AMContainerMap amContainerMap = mock(AMContainerMap.class); + TaskSchedulerEventHandler taskSchedulerEventHandler = + mock(TaskSchedulerEventHandler.class); + dispatcher.register(AMNodeEventType.class, amNodeTracker); + dispatcher.register(AMContainerEventType.class, amContainerMap); + dispatcher.register(AMSchedulerEventType.class, taskSchedulerEventHandler); + amNodeTracker.init(conf); + amNodeTracker.start(); - amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, 0, cId1)); - amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, 0, cId2)); + _testSingleNodeNotBlacklisted(amNodeTracker, handler, 1); + } - TezTaskAttemptID ta1 = mock(TezTaskAttemptID.class); - TezTaskAttemptID ta2 = mock(TezTaskAttemptID.class); + @Test (timeout = 5000) + public void testSingleNodeNotBlacklistedAlternateScheduler2() { + AppContext appContext = mock(AppContext.class); + Configuration conf = new Configuration(false); + conf.setInt(TezConfiguration.TEZ_AM_MAX_TASK_FAILURES_PER_NODE, 2); + conf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, true); + conf.setInt(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD, 33); - amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, 0, cId1, ta1, true)); - dispatcher.await(); - assertEquals(1, node.numFailedTAs); - assertEquals(AMNodeState.ACTIVE, node.getState()); + TestEventHandler handler = new TestEventHandler(); + AMNodeTracker amNodeTracker = new AMNodeTracker(handler, appContext); + doReturn(amNodeTracker).when(appContext).getNodeTracker(); + AMContainerMap amContainerMap = mock(AMContainerMap.class); + TaskSchedulerEventHandler taskSchedulerEventHandler = + mock(TaskSchedulerEventHandler.class); + dispatcher.register(AMNodeEventType.class, amNodeTracker); + dispatcher.register(AMContainerEventType.class, amContainerMap); + dispatcher.register(AMSchedulerEventType.class, taskSchedulerEventHandler); + amNodeTracker.init(conf); + amNodeTracker.start(); - amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, 0, cId2, ta2, true)); - dispatcher.await(); - assertEquals(2, node.numFailedTAs); - assertEquals(1, handler.events.size()); - assertEquals(AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, handler.events.get(0).getType()); - assertEquals(AMNodeState.FORCED_ACTIVE, node.getState()); + // Register multiple nodes from a scheduler which isn't being tested. + // This should not affect the blacklisting behaviour + for (int i = 0 ; i < 10 ; i++) { + amNodeTracker.nodeSeen(NodeId.newInstance("fakenode" + i, 3333), 0); + } + + _testSingleNodeNotBlacklisted(amNodeTracker, handler, 1); + // No impact on blacklisting for the alternate source + assertFalse(amNodeTracker.isBlacklistingIgnored(0)); } @Test(timeout=10000) @@ -186,50 +267,142 @@ public class TestAMNodeTracker { dispatcher.register(AMSchedulerEventType.class, taskSchedulerEventHandler); amNodeTracker.init(conf); amNodeTracker.start(); + try { + _testNodeSelfBlacklist(amNodeTracker, handler, 0); + } finally { + amNodeTracker.stop(); + } + } - amNodeTracker.handle(new AMNodeEventNodeCountUpdated(4, 0)); + @Test(timeout=10000) + public void testNodeSelfBlacklistAlternateScheduler1() { + AppContext appContext = mock(AppContext.class); + Configuration conf = new Configuration(false); + conf.setInt(TezConfiguration.TEZ_AM_MAX_TASK_FAILURES_PER_NODE, 2); + TestEventHandler handler = new TestEventHandler(); + AMNodeTracker amNodeTracker = new AMNodeTracker(handler, appContext); + doReturn(amNodeTracker).when(appContext).getNodeTracker(); + AMContainerMap amContainerMap = mock(AMContainerMap.class); + TaskSchedulerEventHandler taskSchedulerEventHandler = + mock(TaskSchedulerEventHandler.class); + dispatcher.register(AMNodeEventType.class, amNodeTracker); + dispatcher.register(AMContainerEventType.class, amContainerMap); + dispatcher.register(AMSchedulerEventType.class, taskSchedulerEventHandler); + amNodeTracker.init(conf); + amNodeTracker.start(); + try { + _testNodeSelfBlacklist(amNodeTracker, handler, 1); + } finally { + amNodeTracker.stop(); + } + } + + @Test(timeout=10000) + public void testNodeSelfBlacklistAlternateScheduler2() { + AppContext appContext = mock(AppContext.class); + Configuration conf = new Configuration(false); + conf.setInt(TezConfiguration.TEZ_AM_MAX_TASK_FAILURES_PER_NODE, 2); + TestEventHandler handler = new TestEventHandler(); + AMNodeTracker amNodeTracker = new AMNodeTracker(handler, appContext); + doReturn(amNodeTracker).when(appContext).getNodeTracker(); + AMContainerMap amContainerMap = mock(AMContainerMap.class); + TaskSchedulerEventHandler taskSchedulerEventHandler = + mock(TaskSchedulerEventHandler.class); + dispatcher.register(AMNodeEventType.class, amNodeTracker); + dispatcher.register(AMContainerEventType.class, amContainerMap); + dispatcher.register(AMSchedulerEventType.class, taskSchedulerEventHandler); + amNodeTracker.init(conf); + amNodeTracker.start(); + try { + // Register multiple nodes from a scheduler which isn't being tested. + // This should not affect the blacklisting behaviour + for (int i = 0 ; i < 100 ; i++) { + amNodeTracker.nodeSeen(NodeId.newInstance("fakenode" + i, 3333), 0); + } + _testNodeSelfBlacklist(amNodeTracker, handler, 1); + assertFalse(amNodeTracker.isBlacklistingIgnored(0)); + } finally { + amNodeTracker.stop(); + } + } + + private void _testSingleNodeNotBlacklisted(AMNodeTracker amNodeTracker, + TestEventHandler handler, int schedulerId) { + amNodeTracker.handle(new AMNodeEventNodeCountUpdated(1, schedulerId)); + NodeId nodeId = NodeId.newInstance("host1", 1234); + amNodeTracker.nodeSeen(nodeId, schedulerId); + + AMNodeImpl node = (AMNodeImpl) amNodeTracker.get(nodeId, schedulerId); + + ContainerId cId1 = mock(ContainerId.class); + ContainerId cId2 = mock(ContainerId.class); + + amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, schedulerId, cId1)); + amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, schedulerId, cId2)); + + TezTaskAttemptID ta1 = mock(TezTaskAttemptID.class); + TezTaskAttemptID ta2 = mock(TezTaskAttemptID.class); + + amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, schedulerId, cId1, ta1, true)); + dispatcher.await(); + assertEquals(1, node.numFailedTAs); + assertEquals(AMNodeState.ACTIVE, node.getState()); + + amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, schedulerId, cId2, ta2, true)); + dispatcher.await(); + assertEquals(2, node.numFailedTAs); + assertEquals(1, handler.events.size()); + assertEquals(AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, handler.events.get(0).getType()); + assertEquals(AMNodeState.FORCED_ACTIVE, node.getState()); + // Blacklisting should be ignored since the node should have been blacklisted, but has not been + // as a result of being a single node for the source + assertTrue(amNodeTracker.isBlacklistingIgnored(schedulerId)); + } + + private void _testNodeSelfBlacklist(AMNodeTracker amNodeTracker, TestEventHandler handler, int schedulerId) { + amNodeTracker.handle(new AMNodeEventNodeCountUpdated(4, schedulerId)); NodeId nodeId = NodeId.newInstance("host1", 1234); NodeId nodeId2 = NodeId.newInstance("host2", 1234); NodeId nodeId3 = NodeId.newInstance("host3", 1234); NodeId nodeId4 = NodeId.newInstance("host4", 1234); - amNodeTracker.nodeSeen(nodeId, 0); - amNodeTracker.nodeSeen(nodeId2, 0); - amNodeTracker.nodeSeen(nodeId3, 0); - amNodeTracker.nodeSeen(nodeId4, 0); - AMNodeImpl node = (AMNodeImpl) amNodeTracker.get(nodeId, 0); - + amNodeTracker.nodeSeen(nodeId, schedulerId); + amNodeTracker.nodeSeen(nodeId2, schedulerId); + amNodeTracker.nodeSeen(nodeId3, schedulerId); + amNodeTracker.nodeSeen(nodeId4, schedulerId); + AMNodeImpl node = (AMNodeImpl) amNodeTracker.get(nodeId, schedulerId); + ContainerId cId1 = mock(ContainerId.class); ContainerId cId2 = mock(ContainerId.class); ContainerId cId3 = mock(ContainerId.class); - - amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, 0, cId1)); - amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, 0, cId2)); - amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, 0, cId3)); + + amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, schedulerId, cId1)); + amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, schedulerId, cId2)); + amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, schedulerId, cId3)); assertEquals(3, node.containers.size()); - + TezTaskAttemptID ta1 = mock(TezTaskAttemptID.class); TezTaskAttemptID ta2 = mock(TezTaskAttemptID.class); TezTaskAttemptID ta3 = mock(TezTaskAttemptID.class); - - amNodeTracker.handle(new AMNodeEventTaskAttemptSucceeded(nodeId, 0, cId1, ta1)); + + amNodeTracker.handle(new AMNodeEventTaskAttemptSucceeded(nodeId, schedulerId, cId1, ta1)); assertEquals(1, node.numSuccessfulTAs); - - amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, 0, cId2, ta2, true)); + + amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, schedulerId, cId2, ta2, true)); assertEquals(1, node.numSuccessfulTAs); assertEquals(1, node.numFailedTAs); assertEquals(AMNodeState.ACTIVE, node.getState()); // duplicate should not affect anything - amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, 0, cId2, ta2, true)); + amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, schedulerId, cId2, ta2, true)); assertEquals(1, node.numSuccessfulTAs); assertEquals(1, node.numFailedTAs); assertEquals(AMNodeState.ACTIVE, node.getState()); - - amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, 0, cId3, ta3, true)); + + amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, schedulerId, cId3, ta3, true)); dispatcher.await(); assertEquals(1, node.numSuccessfulTAs); assertEquals(2, node.numFailedTAs); assertEquals(AMNodeState.BLACKLISTED, node.getState()); - + assertEquals(4, handler.events.size()); assertEquals(AMContainerEventType.C_NODE_FAILED, handler.events.get(0).getType()); assertEquals(cId1, ((AMContainerEventNodeFailed)handler.events.get(0)).getContainerId()); @@ -246,20 +419,20 @@ public class TestAMNodeTracker { ContainerId cId5 = mock(ContainerId.class); TezTaskAttemptID ta4 = mock(TezTaskAttemptID.class); TezTaskAttemptID ta5 = mock(TezTaskAttemptID.class); - AMNodeImpl node2 = (AMNodeImpl) amNodeTracker.get(nodeId2, 0); - amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId2, 0, cId4)); - amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId2, 0, cId5)); - - amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId2, 0, cId4, ta4, true)); + AMNodeImpl node2 = (AMNodeImpl) amNodeTracker.get(nodeId2, schedulerId); + amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId2, schedulerId, cId4)); + amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId2, schedulerId, cId5)); + + amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId2, schedulerId, cId4, ta4, true)); assertEquals(1, node2.numFailedTAs); assertEquals(AMNodeState.ACTIVE, node2.getState()); - + handler.events.clear(); - amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId2, 0, cId5, ta5, true)); + amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId2, schedulerId, cId5, ta5, true)); dispatcher.await(); assertEquals(2, node2.numFailedTAs); assertEquals(AMNodeState.FORCED_ACTIVE, node2.getState()); - AMNodeImpl node3 = (AMNodeImpl) amNodeTracker.get(nodeId3, 0); + AMNodeImpl node3 = (AMNodeImpl) amNodeTracker.get(nodeId3, schedulerId); assertEquals(AMNodeState.FORCED_ACTIVE, node3.getState()); assertEquals(5, handler.events.size()); @@ -286,7 +459,7 @@ public class TestAMNodeTracker { // Increase the number of nodes. BLACKLISTING should be re-enabled. // Node 1 and Node 2 should go into BLACKLISTED state handler.events.clear(); - amNodeTracker.handle(new AMNodeEventNodeCountUpdated(8, 0)); + amNodeTracker.handle(new AMNodeEventNodeCountUpdated(8, schedulerId)); dispatcher.await(); LOG.info(("Completed waiting for dispatcher to process all pending events")); assertEquals(AMNodeState.BLACKLISTED, node.getState()); @@ -317,7 +490,7 @@ public class TestAMNodeTracker { assertEquals(4, numIgnoreBlacklistingDisabledEvents); assertEquals(2, numBlacklistedEvents); assertEquals(2, numNodeFailedEvents); - + amNodeTracker.stop(); } @@ -336,6 +509,4 @@ public class TestAMNodeTracker { doReturn(healthReportTime).when(nodeReport).getLastHealthReportTime(); return nodeReport; } - - // TODO TEZ-2003. Add tests for multiple sources. }
