Repository: tez Updated Branches: refs/heads/branch-0.7 b55ba592b -> 1b3fbe12c
TEZ-3123. Containers can get re-used even with conflicting local resources. (hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1b3fbe12 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1b3fbe12 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1b3fbe12 Branch: refs/heads/branch-0.7 Commit: 1b3fbe12cd5953f34a00bee8837757eb68392278 Parents: b55ba59 Author: Hitesh Shah <[email protected]> Authored: Fri Feb 19 14:52:28 2016 -0800 Committer: Hitesh Shah <[email protected]> Committed: Fri Feb 19 14:52:28 2016 -0800 ---------------------------------------------------------------------- CHANGES.txt | 3 + .../dag/app/rm/YarnTaskSchedulerService.java | 7 +- .../tez/dag/app/rm/TestContainerReuse.java | 67 ++++++++++++++++++-- 3 files changed, 69 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/1b3fbe12/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 81a50a3..35f773f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,12 +1,15 @@ Apache Tez Change Log ===================== +Release 0.7.1: Unreleased + INCOMPATIBLE CHANGES TEZ-2679. Admin forms of launch env settings TEZ-2949. Allow duplicate dag names within session for Tez. TEZ-2972. Avoid task rescheduling when a node turns unhealthy ALL CHANGES + TEZ-3123. Containers can get re-used even with conflicting local resources. TEZ-3117. Deadlock in Edge and Vertex code TEZ-3107. tez-tools: Log warn msgs in case ATS has wrong values (e.g startTime > finishTime). TEZ-3103. Shuffle can hang when memory to memory merging enabled http://git-wip-us.apache.org/repos/asf/tez/blob/1b3fbe12/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java index f7b7ea3..f8e2b5d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java @@ -2351,11 +2351,12 @@ public class YarnTaskSchedulerService extends TaskSchedulerService // Merge the container signatures to account for any changes to the container // footprint. For example, re-localization of additional resources will // cause the held container's signature to change. - lastAssignedContainerSignature = taskInfo.getCookie().getContainerSignature(); - if (lastTaskInfo != null && lastTaskInfo.getCookie().getContainerSignature() != null) { + if (lastAssignedContainerSignature != null) { lastAssignedContainerSignature = signatureMatcher.union( - lastTaskInfo.getCookie().getContainerSignature(), + lastAssignedContainerSignature, taskInfo.getCookie().getContainerSignature()); + } else { + lastAssignedContainerSignature = taskInfo.getCookie().getContainerSignature(); } lastTaskInfo = taskInfo; } http://git-wip-us.apache.org/repos/asf/tez/blob/1b3fbe12/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java index 79450a9..7ad044b 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java @@ -1206,7 +1206,8 @@ public class TestContainerReuse { TaskAttempt ta111 = mock(TaskAttempt.class); doReturn(taID111).when(ta111).getID(); doReturn("Mock for TA " + taID111.toString()).when(ta111).toString(); - AMSchedulerEventTALaunchRequest lrEvent11 = createLaunchRequestEvent(taID111, ta111, resource1, host1, racks, priority1, v11LR); + AMSchedulerEventTALaunchRequest lrEvent11 = createLaunchRequestEvent( + taID111, ta111, resource1, host1, racks, priority1, v11LR); Map<String, LocalResource> v12LR = Maps.newHashMap(); v12LR.put(rsrc1, lr1); @@ -1217,7 +1218,24 @@ public class TestContainerReuse { TaskAttempt ta112 = mock(TaskAttempt.class); doReturn(taID112).when(ta112).getID(); doReturn("Mock for TA " + taID112.toString()).when(ta112).toString(); - AMSchedulerEventTALaunchRequest lrEvent12 = createLaunchRequestEvent(taID112, ta112, resource1, host1, racks, priority1, v12LR); + AMSchedulerEventTALaunchRequest lrEvent12 = createLaunchRequestEvent( + taID112, ta112, resource1, host1, racks, priority1, v12LR); + + //Vertex 1, Task 3, Attempt 1, host1 + TezTaskAttemptID taID113 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID11, 3), 1); + TaskAttempt ta113 = mock(TaskAttempt.class); + doReturn(taID113).when(ta113).getID(); + doReturn("Mock for TA " + taID113.toString()).when(ta113).toString(); + AMSchedulerEventTALaunchRequest lrEvent13 = createLaunchRequestEvent( + taID113, ta113, resource1, host1, racks, priority1, new HashMap<String, LocalResource>()); + //Vertex 1, Task 4, Attempt 1, host1 + TezTaskAttemptID taID114 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID11, 4), 1); + TaskAttempt ta114 = mock(TaskAttempt.class); + doReturn(taID114).when(ta114).getID(); + doReturn("Mock for TA " + taID114.toString()).when(ta114).toString(); + AMSchedulerEventTALaunchRequest lrEvent14 = createLaunchRequestEvent( + taID114, ta114, resource1, host1, racks, priority1, new HashMap<String, LocalResource>()); + drainNotifier.set(false); taskSchedulerEventHandler.handleEvent(lrEvent11); @@ -1239,7 +1257,8 @@ public class TestContainerReuse { assertEquals(1, assignEvent.getRemoteTaskLocalResources().size()); // Task assigned to container completed successfully. Container should be re-used. - taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED)); + taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta111, container1.getId(), + TaskAttemptState.SUCCEEDED)); drainableAppCallback.drain(); verify(taskScheduler).deallocateTask(eq(ta111), eq(true)); verify(taskSchedulerEventHandler).taskAllocated(eq(ta112), any(Object.class), eq(container1)); @@ -1251,14 +1270,52 @@ public class TestContainerReuse { // Task assigned to container completed successfully. // Verify reuse across hosts. - taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED)); + taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), + TaskAttemptState.SUCCEEDED)); drainableAppCallback.drain(); verify(taskScheduler).deallocateTask(eq(ta112), eq(true)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); eventHandler.reset(); - // Setup DAG2 with additional resources. Make sure the container, even without all resources, is reused. + // Task 3 + drainNotifier.set(false); + taskSchedulerEventHandler.handleEvent(lrEvent13); + TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); + + verify(taskSchedulerEventHandler).taskAllocated(eq(ta113), any(Object.class), eq(container1)); + verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); + eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); + eventHandler.reset(); + + taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta113, container1.getId(), + TaskAttemptState.SUCCEEDED)); + drainableAppCallback.drain(); + verify(taskScheduler).deallocateTask(eq(ta113), eq(true)); + verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); + eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); + eventHandler.reset(); + + // Task 4 + drainNotifier.set(false); + taskSchedulerEventHandler.handleEvent(lrEvent14); + TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); + + verify(taskSchedulerEventHandler).taskAllocated(eq(ta114), any(Object.class), eq(container1)); + verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); + eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); + eventHandler.reset(); + + taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta114, container1.getId(), + TaskAttemptState.SUCCEEDED)); + drainableAppCallback.drain(); + verify(taskScheduler).deallocateTask(eq(ta114), eq(true)); + verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); + eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); + eventHandler.reset(); + + + // Setup DAG2 with different resources. TezDAGID dagID2 = TezDAGID.getInstance("0", 2, 0); dagIDAnswer.setDAGID(dagID2);
