TEZ-2192. Relocalization does not check for source. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7aeebfe7 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7aeebfe7 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7aeebfe7 Branch: refs/heads/TEZ-2003 Commit: 7aeebfe7cd07786d2040b568a924fa58783c7ba0 Parents: 26518d5 Author: Hitesh Shah <[email protected]> Authored: Mon Apr 6 14:38:03 2015 -0700 Committer: Hitesh Shah <[email protected]> Committed: Mon Apr 6 14:38:03 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/dag/app/ContainerContext.java | 73 ++++++++ .../dag/app/rm/YarnTaskSchedulerService.java | 41 +++-- .../rm/container/ContainerContextMatcher.java | 7 + .../rm/container/ContainerSignatureMatcher.java | 12 ++ .../tez/dag/app/rm/TestContainerReuse.java | 167 ++++++++++++++++++- .../tez/dag/app/rm/TestTaskScheduler.java | 21 ++- .../dag/app/rm/TestTaskSchedulerHelpers.java | 10 ++ 8 files changed, 308 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/7aeebfe7/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ee0ef70..be2e617 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -263,6 +263,7 @@ TEZ-UI CHANGES (TEZ-8): Release 0.5.4: Unreleased ALL CHANGES: + TEZ-2192. Relocalization does not check for source. TEZ-2224. EventQueue empty doesn't mean events are consumed in RecoveryService TEZ-2240. Fix toUpperCase/toLowerCase to use Locale.ENGLISH. TEZ-2238. TestContainerReuse flaky http://git-wip-us.apache.org/repos/asf/tez/blob/7aeebfe7/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerContext.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerContext.java index e7f1a10..f00b27b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerContext.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerContext.java @@ -23,6 +23,9 @@ import java.util.Map; import java.util.Map.Entry; import javax.annotation.Nullable; + +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.security.Credentials; @@ -142,6 +145,11 @@ public class ContainerContext { for (Entry<String, LocalResource> additionalLREntry : reqLRsCopy.entrySet()) { LocalResource lr = additionalLREntry.getValue(); if (EnumSet.of(LocalResourceType.ARCHIVE, LocalResourceType.PATTERN).contains(lr.getType())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Cannot match container: Additional local resource needed is not of type FILE" + + ", resourceName: " + additionalLREntry.getKey() + + ", resourceDetails: " + additionalLREntry); + } return false; } } @@ -177,4 +185,69 @@ public class ContainerContext { } return true; } + + /** + * Create a new ContainerContext to account for container re-use. On re-use, there is + * re-localization of additional LocalResources. Also, a task from a different vertex could be + * run on the given container. + * + * Only a merge of local resources is needed as: + * + * credentials are modified at run-time based on the task spec. + * the environment for a container cannot be changed. A re-used container is always + * expected to have a super-set. + * javaOpts have to be identical for re-use. + * + * Vertex should be overridden to account for the new task being scheduled to run on this + * container context. + * + * @param c1 ContainerContext 1 Original task's context + * @param c2 ContainerContext 2 Newly assigned task's context + * @return Merged ContainerContext + */ + public static ContainerContext union(ContainerContext c1, ContainerContext c2) { + HashMap<String, LocalResource> mergedLR = new HashMap<String, LocalResource>(); + mergedLR.putAll(c1.getLocalResources()); + mergedLR.putAll(c2.getLocalResources()); + ContainerContext union = new ContainerContext(mergedLR, c1.credentials, c1.environment, + c1.javaOpts, c2.vertex); + return union; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + + sb.append("LocalResources: ["); + if (localResources != null) { + for (Map.Entry<String, LocalResource> lr : localResources.entrySet()) { + sb.append("[ name=") + .append(lr.getKey()) + .append(", value=") + .append(lr.getValue()) + .append("],"); + } + } + sb.append("], environment: ["); + if (environment != null) { + for (Map.Entry<String, String> entry : environment.entrySet()) { + sb.append("[ ").append(entry.getKey()).append("=").append(entry.getValue()) + .append(" ],"); + } + } + sb.append("], credentials(token kinds): ["); + if (credentials != null) { + for (Token<? extends TokenIdentifier> t : credentials.getAllTokens()) { + sb.append(t.getKind().toString()) + .append(","); + } + } + sb.append("], javaOpts: ") + .append(javaOpts) + .append(", vertex: ") + .append(( vertex == null ? "null" : vertex.getLogIdentifier())); + + return sb.toString(); + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/7aeebfe7/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java index ff90e5d..66a6f33 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java @@ -1386,7 +1386,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService + " containerId=" + heldContainer.container.getId()); } if (containerSignatureMatcher.isSuperSet(heldContainer - .getFirstContainerSignature(), cookieContainerRequest.getCookie() + .getLastAssignedContainerSignature(), cookieContainerRequest.getCookie() .getContainerSignature())) { if (LOG.isDebugEnabled()) { LOG.debug("Matched delayed container to task" @@ -1441,7 +1441,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService HeldContainer heldContainer = heldContainers.get(container.getId()); if (!shouldReuseContainers && heldContainer == null) { heldContainers.put(container.getId(), new HeldContainer(container, - -1, -1, assigned)); + -1, -1, assigned, this.containerSignatureMatcher)); Resources.addTo(allocatedResources, container.getResource()); } else { if (heldContainer.isNew()) { @@ -1451,7 +1451,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService // think about preferring within vertex matching etc. heldContainers.put(container.getId(), new HeldContainer(container, heldContainer.getNextScheduleTime(), - heldContainer.getContainerExpiryTime(), assigned)); + heldContainer.getContainerExpiryTime(), assigned, this.containerSignatureMatcher)); } heldContainer.setLastTaskInfo(assigned); } @@ -1463,7 +1463,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService synchronized (delayedContainerManager) { for (Container container : containers) { if (heldContainers.put(container.getId(), new HeldContainer(container, - -1, expireTime, null)) != null) { + -1, expireTime, null, this.containerSignatureMatcher)) != null) { throw new TezUncheckedException("New container " + container.getId() + " is already held."); } @@ -2116,33 +2116,36 @@ public class YarnTaskSchedulerService extends TaskSchedulerService NON_LOCAL } - Container container; - private String rack; + final Container container; + final private String rack; private long nextScheduleTime; - private Object firstContainerSignature; private LocalityMatchLevel localityMatchLevel; private long containerExpiryTime; private CookieContainerRequest lastTaskInfo; private int numAssignmentAttempts = 0; + private Object lastAssignedContainerSignature; + final ContainerSignatureMatcher signatureMatcher; HeldContainer(Container container, long nextScheduleTime, long containerExpiryTime, - CookieContainerRequest firstTaskInfo) { + CookieContainerRequest firstTaskInfo, + ContainerSignatureMatcher signatureMatcher) { this.container = container; this.nextScheduleTime = nextScheduleTime; if (firstTaskInfo != null) { this.lastTaskInfo = firstTaskInfo; - this.firstContainerSignature = firstTaskInfo.getCookie().getContainerSignature(); + this.lastAssignedContainerSignature = firstTaskInfo.getCookie().getContainerSignature(); } this.localityMatchLevel = LocalityMatchLevel.NODE; this.containerExpiryTime = containerExpiryTime; this.rack = RackResolver.resolve(container.getNodeId().getHost()) .getNetworkLocation(); + this.signatureMatcher = signatureMatcher; } boolean isNew() { - return firstContainerSignature == null; + return lastTaskInfo == null; } String getRack() { @@ -2181,15 +2184,24 @@ public class YarnTaskSchedulerService extends TaskSchedulerService this.containerExpiryTime = containerExpiryTime; } - public Object getFirstContainerSignature() { - return this.firstContainerSignature; + public Object getLastAssignedContainerSignature() { + return this.lastAssignedContainerSignature; } - + public CookieContainerRequest getLastTaskInfo() { return this.lastTaskInfo; } public void setLastTaskInfo(CookieContainerRequest taskInfo) { + // Merge the container signatures to account for any changes to the container + // footprint. For example, re-localization of additional resources will + // cause the held container's signature to change. + lastAssignedContainerSignature = taskInfo.getCookie().getContainerSignature(); + if (lastTaskInfo != null && lastTaskInfo.getCookie().getContainerSignature() != null) { + lastAssignedContainerSignature = signatureMatcher.union( + lastTaskInfo.getCookie().getContainerSignature(), + taskInfo.getCookie().getContainerSignature()); + } lastTaskInfo = taskInfo; } @@ -2220,7 +2232,8 @@ public class YarnTaskSchedulerService extends TaskSchedulerService + ", nextScheduleTime: " + nextScheduleTime + ", localityMatchLevel=" + localityMatchLevel + ", signature: " - + (firstContainerSignature != null? firstContainerSignature.toString():"null"); + + (lastAssignedContainerSignature != null? lastAssignedContainerSignature.toString() + : "null"); } } } http://git-wip-us.apache.org/repos/asf/tez/blob/7aeebfe7/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerContextMatcher.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerContextMatcher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerContextMatcher.java index 6d65a67..211c537 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerContextMatcher.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerContextMatcher.java @@ -70,4 +70,11 @@ public class ContainerContextMatcher implements ContainerSignatureMatcher { } return c2LocalResources; } + + @Override + public Object union(Object cs1, Object cs2) { + checkArguments(cs1, cs2); + return ContainerContext.union((ContainerContext) cs1, (ContainerContext) cs2); + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/7aeebfe7/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerSignatureMatcher.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerSignatureMatcher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerSignatureMatcher.java index df17564..0f9c2d6 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerSignatureMatcher.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerSignatureMatcher.java @@ -45,4 +45,16 @@ public interface ContainerSignatureMatcher { */ public Map<String, LocalResource> getAdditionalResources(Map<String, LocalResource> lr1, Map<String, LocalResource> lr2); + + + /** + * Do a union of 2 signatures + * Pre-condition. This function should only be invoked iff cs1 is compatible with cs2. + * i.e. isSuperSet should not return false. + * @param cs1 Signature 1 Original signature + * @param cs2 Signature 2 New signature + * @return Union of 2 signatures + */ + public Object union(Object cs1, Object cs2); + } http://git-wip-us.apache.org/repos/asf/tez/blob/7aeebfe7/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java index c70003b..89b77a7 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java @@ -1028,7 +1028,7 @@ public class TestContainerReuse { LocalResource lr1 = mock(LocalResource.class); LocalResource lr2 = mock(LocalResource.class); LocalResource lr3 = mock(LocalResource.class); - + AMContainerEventAssignTA assignEvent = null; Map<String, LocalResource> dag1LRs = Maps.newHashMap(); @@ -1091,7 +1091,7 @@ public class TestContainerReuse { Map<String, LocalResource> dag2LRs = Maps.newHashMap(); dag2LRs.put(rsrc2, lr2); dag2LRs.put(rsrc3, lr3); - + TezVertexID vertexID21 = TezVertexID.getInstance(dagID2, 1); //Vertex 2, Task 1, Attempt 1, host1, lr2 @@ -1132,6 +1132,169 @@ public class TestContainerReuse { taskSchedulerEventHandler.close(); } + @Test(timeout = 30000l) + public void testReuseConflictLocalResources() throws IOException, InterruptedException, ExecutionException { + LOG.info("Test testReuseLocalResourcesChanged"); + Configuration tezConf = new Configuration(new YarnConfiguration()); + tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true); + tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, true); + tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED, true); + tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0); + tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, -1); + + TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class); + + CapturingEventHandler eventHandler = new CapturingEventHandler(); + TezDAGID dagID1 = TezDAGID.getInstance("0", 1, 0); + + AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest(); + TezAMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100)); + String appUrl = "url"; + String appMsg = "success"; + AppFinalStatus finalStatus = + new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl); + + doReturn(finalStatus).when(mockApp).getFinalAppStatus(); + + AppContext appContext = mock(AppContext.class); + doReturn(new Configuration(false)).when(appContext).getAMConf(); + ChangingDAGIDAnswer dagIDAnswer = new ChangingDAGIDAnswer(dagID1); + AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class), + mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext); + AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext); + doReturn(amContainerMap).when(appContext).getAllContainers(); + doReturn(amNodeTracker).when(appContext).getNodeTracker(); + doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState(); + doReturn(true).when(appContext).isSession(); + doAnswer(dagIDAnswer).when(appContext).getCurrentDAGID(); + doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); + + TaskSchedulerEventHandler taskSchedulerEventHandlerReal = + new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, + new ContainerContextMatcher()); + TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal); + taskSchedulerEventHandler.init(tezConf); + taskSchedulerEventHandler.start(); + + TaskSchedulerWithDrainableAppCallback taskScheduler = (TaskSchedulerWithDrainableAppCallback) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler) + .getSpyTaskScheduler(); + TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); + AtomicBoolean drainNotifier = new AtomicBoolean(false); + taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier; + + Resource resource1 = Resource.newInstance(1024, 1); + String[] host1 = {"host1"}; + + String []racks = {"/default-rack"}; + Priority priority1 = Priority.newInstance(1); + + String rsrc1 = "rsrc1"; + String rsrc2 = "rsrc2"; + LocalResource lr1 = mock(LocalResource.class); + LocalResource lr2 = mock(LocalResource.class); + LocalResource lr3 = mock(LocalResource.class); + + AMContainerEventAssignTA assignEvent = null; + + Map<String, LocalResource> v11LR = Maps.newHashMap(); + v11LR.put(rsrc1, lr1); + + TezVertexID vertexID11 = TezVertexID.getInstance(dagID1, 1); + + //Vertex 1, Task 1, Attempt 1, host1, lr1 + TezTaskAttemptID taID111 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID11, 1), 1); + TaskAttempt ta111 = mock(TaskAttempt.class); + doReturn(taID111).when(ta111).getID(); + doReturn("Mock for TA " + taID111.toString()).when(ta111).toString(); + AMSchedulerEventTALaunchRequest lrEvent11 = createLaunchRequestEvent(taID111, ta111, resource1, host1, racks, priority1, v11LR); + + Map<String, LocalResource> v12LR = Maps.newHashMap(); + v12LR.put(rsrc1, lr1); + v12LR.put(rsrc2, lr2); + + //Vertex 1, Task 2, Attempt 1, host1, lr1 + TezTaskAttemptID taID112 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID11, 2), 1); + TaskAttempt ta112 = mock(TaskAttempt.class); + doReturn(taID112).when(ta112).getID(); + doReturn("Mock for TA " + taID112.toString()).when(ta112).toString(); + AMSchedulerEventTALaunchRequest lrEvent12 = createLaunchRequestEvent(taID112, ta112, resource1, host1, racks, priority1, v12LR); + + drainNotifier.set(false); + taskSchedulerEventHandler.handleEvent(lrEvent11); + TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); + drainNotifier.set(false); + taskSchedulerEventHandler.handleEvent(lrEvent12); + TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); + + Container container1 = createContainer(1, "host1", resource1, priority1); + Container container2 = createContainer(2, "host1", resource1, priority1); + + // One container allocated. + drainNotifier.set(false); + taskScheduler.onContainersAllocated(Collections.singletonList(container1)); + TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); + drainableAppCallback.drain(); + verify(taskSchedulerEventHandler).taskAllocated(eq(ta111), any(Object.class), eq(container1)); + assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class); + assertEquals(1, assignEvent.getRemoteTaskLocalResources().size()); + + // Task assigned to container completed successfully. Container should be re-used. + taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED)); + drainableAppCallback.drain(); + verify(taskScheduler).deallocateTask(eq(ta111), eq(true)); + verify(taskSchedulerEventHandler).taskAllocated(eq(ta112), any(Object.class), eq(container1)); + verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); + eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); + assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class); + assertEquals(1, assignEvent.getRemoteTaskLocalResources().size()); + eventHandler.reset(); + + // Task assigned to container completed successfully. + // Verify reuse across hosts. + taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED)); + drainableAppCallback.drain(); + verify(taskScheduler).deallocateTask(eq(ta112), eq(true)); + verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); + eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); + eventHandler.reset(); + + // Setup DAG2 with additional resources. Make sure the container, even without all resources, is reused. + TezDAGID dagID2 = TezDAGID.getInstance("0", 2, 0); + dagIDAnswer.setDAGID(dagID2); + + Map<String, LocalResource> v21LR = Maps.newHashMap(); + v21LR.put(rsrc1, lr1); + v21LR.put(rsrc2, lr3); + + TezVertexID vertexID21 = TezVertexID.getInstance(dagID2, 1); + + //Vertex 2, Task 1, Attempt 1, host1, lr2 + TezTaskAttemptID taID211 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID21, 1), 1); + TaskAttempt ta211 = mock(TaskAttempt.class); + doReturn(taID211).when(ta211).getID(); + doReturn("Mock for TA " + taID211.toString()).when(ta211).toString(); + AMSchedulerEventTALaunchRequest lrEvent21 = createLaunchRequestEvent(taID211, ta211, resource1, + host1, racks, priority1, v21LR); + + taskSchedulerEventHandler.handleEvent(lrEvent21); + drainableAppCallback.drain(); + + // TODO This is terrible, need a better way to ensure the scheduling loop has run + LOG.info("Sleeping to ensure that the scheduling loop runs"); + Thread.sleep(6000l); + drainNotifier.set(false); + taskScheduler.onContainersAllocated(Collections.singletonList(container2)); + + Thread.sleep(6000l); + verify(rmClient, times(1)).releaseAssignedContainer(eq(container1.getId())); + verify(taskSchedulerEventHandler).taskAllocated(eq(ta211), any(Object.class), eq(container2)); + eventHandler.reset(); + + taskScheduler.close(); + taskSchedulerEventHandler.close(); + } + + private Container createContainer(int id, String host, Resource resource, Priority priority) { ContainerId containerID = ContainerId.newInstance( ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1), http://git-wip-us.apache.org/repos/asf/tez/blob/7aeebfe7/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java index 21bce6d..dabae67 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java @@ -74,6 +74,7 @@ import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerAppCallba import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback; import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher; import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.PreemptionMatcher; +import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -90,6 +91,8 @@ public class TestTaskScheduler { RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + static ContainerSignatureMatcher containerSignatureMatcher = new AlwaysMatchesContainerMatcher(); + @BeforeClass public static void beforeClass() { MockDNSToSwitchMapping.initializeMockRackResolver(); @@ -527,6 +530,7 @@ public class TestTaskScheduler { conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0); // to release immediately after deallocate conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0); + conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0); scheduler.init(conf); drainableAppCallback.drain(); @@ -1049,7 +1053,7 @@ public class TestTaskScheduler { ContainerId mockCId1 = ContainerId.newInstance(appId, 0); Container c1 = mock(Container.class, RETURNS_DEEP_STUBS); when(c1.getNodeId().getHost()).thenReturn(""); // we are mocking directly - HeldContainer hc1 = Mockito.spy(new HeldContainer(c1, 0, 0, null)); + HeldContainer hc1 = Mockito.spy(new HeldContainer(c1, 0, 0, null, containerSignatureMatcher)); when(hc1.getNode()).thenReturn(node1Rack1); when(hc1.getRack()).thenReturn(rack1); when(c1.getId()).thenReturn(mockCId1); @@ -1058,7 +1062,7 @@ public class TestTaskScheduler { ContainerId mockCId2 = ContainerId.newInstance(appId, 1); Container c2 = mock(Container.class, RETURNS_DEEP_STUBS); when(c2.getNodeId().getHost()).thenReturn(""); // we are mocking directly - HeldContainer hc2 = Mockito.spy(new HeldContainer(c2, 0, 0, null)); + HeldContainer hc2 = Mockito.spy(new HeldContainer(c2, 0, 0, null, containerSignatureMatcher)); when(hc2.getNode()).thenReturn(node2Rack1); when(hc2.getRack()).thenReturn(rack1); when(c2.getId()).thenReturn(mockCId2); @@ -1067,7 +1071,7 @@ public class TestTaskScheduler { ContainerId mockCId3 = ContainerId.newInstance(appId, 2); Container c3 = mock(Container.class, RETURNS_DEEP_STUBS); when(c3.getNodeId().getHost()).thenReturn(""); // we are mocking directly - HeldContainer hc3 = Mockito.spy(new HeldContainer(c3, 0, 0, null)); + HeldContainer hc3 = Mockito.spy(new HeldContainer(c3, 0, 0, null, containerSignatureMatcher)); when(hc3.getNode()).thenReturn(node1Rack1); when(hc3.getRack()).thenReturn(rack1); when(c3.getId()).thenReturn(mockCId3); @@ -1076,7 +1080,7 @@ public class TestTaskScheduler { ContainerId mockCId4 = ContainerId.newInstance(appId, 3); Container c4 = mock(Container.class, RETURNS_DEEP_STUBS); when(c4.getNodeId().getHost()).thenReturn(""); // we are mocking directly - HeldContainer hc4 = Mockito.spy(new HeldContainer(c4, 0, 0, null)); + HeldContainer hc4 = Mockito.spy(new HeldContainer(c4, 0, 0, null, containerSignatureMatcher)); when(hc4.getNode()).thenReturn(node2Rack1); when(hc4.getRack()).thenReturn(rack1); when(c4.getId()).thenReturn(mockCId4); @@ -1085,7 +1089,7 @@ public class TestTaskScheduler { ContainerId mockCId5 = ContainerId.newInstance(appId, 4); Container c5 = mock(Container.class, RETURNS_DEEP_STUBS); when(c5.getNodeId().getHost()).thenReturn(""); // we are mocking directly - HeldContainer hc5 = Mockito.spy(new HeldContainer(c5, 0, 0, null)); + HeldContainer hc5 = Mockito.spy(new HeldContainer(c5, 0, 0, null, containerSignatureMatcher)); when(hc5.getNode()).thenReturn(node1Rack2); when(hc5.getRack()).thenReturn(rack2); when(c5.getId()).thenReturn(mockCId5); @@ -1094,7 +1098,7 @@ public class TestTaskScheduler { ContainerId mockCId6 = ContainerId.newInstance(appId, 5); Container c6 = mock(Container.class, RETURNS_DEEP_STUBS); when(c6.getNodeId().getHost()).thenReturn(""); // we are mocking directly - HeldContainer hc6 = Mockito.spy(new HeldContainer(c6, 0, 0, null)); + HeldContainer hc6 = Mockito.spy(new HeldContainer(c6, 0, 0, null, containerSignatureMatcher)); when(hc6.getNode()).thenReturn(node2Rack2); when(hc6.getRack()).thenReturn(rack2); when(c6.getId()).thenReturn(mockCId6); @@ -1103,7 +1107,7 @@ public class TestTaskScheduler { ContainerId mockCId7 = ContainerId.newInstance(appId, 6); Container c7 = mock(Container.class, RETURNS_DEEP_STUBS); when(c7.getNodeId().getHost()).thenReturn(""); // we are mocking directly - HeldContainer hc7 = Mockito.spy(new HeldContainer(c7, 0, 0, null)); + HeldContainer hc7 = Mockito.spy(new HeldContainer(c7, 0, 0, null, containerSignatureMatcher)); when(hc7.getNode()).thenReturn(node1Rack3); when(hc7.getRack()).thenReturn(rack3); when(c7.getId()).thenReturn(mockCId7); @@ -1472,7 +1476,8 @@ public class TestTaskScheduler { containers.add(mockContainer4); // Fudge new container being present in delayed allocation list due to race - HeldContainer heldContainer = new HeldContainer(mockContainer4, -1, -1, null); + HeldContainer heldContainer = new HeldContainer(mockContainer4, -1, -1, null, + containerSignatureMatcher); scheduler.delayedContainerManager.delayedContainers.add(heldContainer); // no preemption - container assignment attempts < 3 scheduler.getProgress(); http://git-wip-us.apache.org/repos/asf/tez/blob/7aeebfe7/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java index bb44889..77c98b7 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java @@ -342,6 +342,11 @@ class TestTaskSchedulerHelpers { Map<String, LocalResource> lr2) { return Maps.newHashMap(); } + + @Override + public Object union(Object cs1, Object cs2) { + return cs1; + } } static class PreemptionMatcher implements ContainerSignatureMatcher { @@ -365,6 +370,11 @@ class TestTaskSchedulerHelpers { Map<String, LocalResource> lr2) { return Maps.newHashMap(); } + + @Override + public Object union(Object cs1, Object cs2) { + return cs1; + } }
