Repository: tez Updated Branches: refs/heads/branch-0.9 b1fbcebbe -> 15d853787
TEZ-3935. DAG aware scheduler should release unassigned new containers rather than hold them (Jason Lowe via jeagles) (cherry picked from commit e72b0a23ab28ee4759aa5e328f5a19cecc020942) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/15d85378 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/15d85378 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/15d85378 Branch: refs/heads/branch-0.9 Commit: 15d853787c1edee36622aeb44b0c1a93e7e24c7f Parents: b1fbceb Author: Jonathan Eagles <[email protected]> Authored: Tue May 22 13:28:55 2018 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Tue May 22 13:30:57 2018 -0500 ---------------------------------------------------------------------- .../apache/tez/dag/api/TezConfiguration.java | 14 ++++ .../dag/app/rm/DagAwareYarnTaskScheduler.java | 7 +- .../app/rm/TestDagAwareYarnTaskScheduler.java | 73 +++++++++++++++++++- 3 files changed, 92 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/15d85378/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 243f278..50b17b9 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -1021,6 +1021,20 @@ public class TezConfiguration extends Configuration { TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED_DEFAULT = false; /** + * Boolean value. Whether to reuse new containers that could not be immediately assigned to + * pending requests. If enabled then newly assigned containers that cannot be immediately + * allocated will be held for potential reuse as if it were a container that had just completed + * a task. If disabled then newly assigned containers that cannot be immediately allocated will + * be released. Active only if container reuse is enabled. + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty(type="boolean") + public static final String TEZ_AM_CONTAINER_REUSE_NEW_CONTAINERS_ENABLED = + TEZ_AM_PREFIX + "container.reuse.new-containers.enabled"; + public static final boolean + TEZ_AM_CONTAINER_REUSE_NEW_CONTAINERS_ENABLED_DEFAULT = false; + + /** * Int value. The amount of time to wait before assigning a container to the next level * of locality. NODE -> RACK -> NON_LOCAL. Delay scheduling parameter. Expert level setting. */ http://git-wip-us.apache.org/repos/asf/tez/blob/15d85378/tez-dag/src/main/java/org/apache/tez/dag/app/rm/DagAwareYarnTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/DagAwareYarnTaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/DagAwareYarnTaskScheduler.java index dab1cad..167d879 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/DagAwareYarnTaskScheduler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/DagAwareYarnTaskScheduler.java @@ -147,6 +147,7 @@ public class DagAwareYarnTaskScheduler extends TaskScheduler private boolean shouldReuseContainers; private boolean reuseRackLocal; private boolean reuseNonLocal; + private boolean reuseNewContainers; private long localitySchedulingDelay; private long idleContainerTimeoutMin; private long idleContainerTimeoutMax; @@ -192,6 +193,10 @@ public class DagAwareYarnTaskScheduler extends TaskScheduler "Re-use Rack-Local cannot be disabled if Re-use Non-Local has been" + " enabled"); + reuseNewContainers = shouldReuseContainers && conf.getBoolean( + TezConfiguration.TEZ_AM_CONTAINER_REUSE_NEW_CONTAINERS_ENABLED, + TezConfiguration.TEZ_AM_CONTAINER_REUSE_NEW_CONTAINERS_ENABLED_DEFAULT); + localitySchedulingDelay = conf.getLong( TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS_DEFAULT); @@ -362,7 +367,7 @@ public class DagAwareYarnTaskScheduler extends TaskScheduler } for (HeldContainer hc : unassigned) { - if (shouldReuseContainers) { + if (reuseNewContainers) { idleTracker.add(hc); TaskRequest assigned = tryAssignReuseContainer(hc, appState, isSession); if (assigned != null) { http://git-wip-us.apache.org/repos/asf/tez/blob/15d85378/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java index 529f65c..0910ed2 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestDagAwareYarnTaskScheduler.java @@ -1229,7 +1229,7 @@ public class TestDagAwareYarnTaskScheduler { } @Test(timeout=50000) - public void testIdleContainerAssignment() throws Exception { + public void testContainerAssignmentReleaseNewContainers() throws Exception { AMRMClientAsyncWrapperForTest mockRMClient = spy(new AMRMClientAsyncWrapperForTest()); String appHost = "host"; @@ -1241,6 +1241,77 @@ public class TestDagAwareYarnTaskScheduler { conf.setInt(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 100); conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, false); conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED, false); + conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_NEW_CONTAINERS_ENABLED, false); + conf.setInt(TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX, 100); + conf.setInt(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 4000); + conf.setInt(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 5000); + conf.setInt(TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, 5); + + DagInfo mockDagInfo = mock(DagInfo.class); + when(mockDagInfo.getTotalVertices()).thenReturn(10); + when(mockDagInfo.getVertexDescendants(anyInt())).thenReturn(new BitSet()); + TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf); + when(mockApp.getCurrentDagInfo()).thenReturn(mockDagInfo); + when(mockApp.isSession()).thenReturn(true); + TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp); + + MockClock clock = new MockClock(1000); + NewTaskSchedulerForTest scheduler = new NewTaskSchedulerForTest(drainableAppCallback, + mockRMClient, clock); + + scheduler.initialize(); + drainableAppCallback.drain(); + + scheduler.start(); + drainableAppCallback.drain(); + verify(mockRMClient).start(); + verify(mockRMClient).registerApplicationMaster(appHost, appPort, appUrl); + RegisterApplicationMasterResponse regResponse = mockRMClient.getRegistrationResponse(); + verify(mockApp).setApplicationRegistrationData(regResponse.getMaximumResourceCapability(), + regResponse.getApplicationACLs(), regResponse.getClientToAMTokenMasterKey(), + regResponse.getQueue()); + + assertEquals(scheduler.getClusterNodeCount(), mockRMClient.getClusterNodeCount()); + + final String rack1 = "/r1"; + final String rack2 = "/r2"; + final String node1Rack1 = "n1r1"; + final String node2Rack1 = "n2r1"; + final String node1Rack2 = "n1r2"; + MockDNSToSwitchMapping.addRackMapping(node1Rack1, rack1); + MockDNSToSwitchMapping.addRackMapping(node2Rack1, rack1); + MockDNSToSwitchMapping.addRackMapping(node1Rack2, rack2); + + Priority priorityv0 = Priority.newInstance(1); + MockTaskInfo taskv0t0 = new MockTaskInfo("taskv0t0", priorityv0, node1Rack1, rack1); + + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1); + ContainerId cid1 = ContainerId.newContainerId(attemptId, 1); + NodeId n2r1 = NodeId.newInstance(node2Rack1, 1); + Container container1 = Container.newInstance(cid1, n2r1, null, taskv0t0.capability, priorityv0, null); + + // verify new container is released is not immediately allocated + scheduler.onContainersAllocated(Collections.singletonList(container1)); + drainableAppCallback.drain(); + // app is not notified of the container being released since it never launched + verify(mockApp, never()).containerBeingReleased(cid1); + verify(mockRMClient).releaseAssignedContainer(eq(cid1)); + } + + @Test(timeout=50000) + public void testIdleContainerAssignmentReuseNewContainers() throws Exception { + AMRMClientAsyncWrapperForTest mockRMClient = spy(new AMRMClientAsyncWrapperForTest()); + + String appHost = "host"; + int appPort = 0; + String appUrl = "url"; + + Configuration conf = new Configuration(); + conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true); + conf.setInt(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 100); + conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, false); + conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED, false); + conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_NEW_CONTAINERS_ENABLED, true); conf.setInt(TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX, 100); conf.setInt(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 4000); conf.setInt(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 5000);
