Repository: tez Updated Branches: refs/heads/master de51d40e9 -> 92b20cc26
TEZ-3000. Fix TestContainerReuse. (mingma) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/92b20cc2 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/92b20cc2 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/92b20cc2 Branch: refs/heads/master Commit: 92b20cc26d2ab3847922aca8077e1e303f8a597c Parents: de51d40 Author: Ming Ma <[email protected]> Authored: Wed Sep 21 16:19:54 2016 -0700 Committer: Ming Ma <[email protected]> Committed: Wed Sep 21 16:19:54 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + tez-dag/findbugs-exclude.xml | 7 ++ .../dag/app/rm/YarnTaskSchedulerService.java | 16 +++- .../tez/dag/app/rm/TestContainerReuse.java | 88 +++++--------------- .../dag/app/rm/TestTaskSchedulerHelpers.java | 16 ++-- 5 files changed, 50 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/92b20cc2/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index bdfd4c2..9a24eac 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3000. Fix TestContainerReuse. TEZ-3436. Check input and output count before start in MapProcessor. TEZ-3163. Reuse and tune Inflaters and Deflaters to speed DME processing TEZ-3434. Add unit tests for flushing of recovery events. @@ -111,6 +112,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3000. Fix TestContainerReuse. TEZ-3436. Check input and output count before start in MapProcessor. TEZ-3426. Second AM attempt launched for session mode and recovery disabled for certain cases TEZ-3326. Display JVM system properties in AM and task logs. http://git-wip-us.apache.org/repos/asf/tez/blob/92b20cc2/tez-dag/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml index 0f3cdca..5eed7eb 100644 --- a/tez-dag/findbugs-exclude.xml +++ b/tez-dag/findbugs-exclude.xml @@ -224,6 +224,13 @@ <Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER"/> </Match> + <!-- TEZ-3000 --> + <Match> + <Class name="org.apache.tez.dag.app.rm.YarnTaskSchedulerService$DelayedContainerManager"/> + <Method name="addDelayedContainer"/><Field name="drainedDelayedContainersForTest"/> + <Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER"/> + </Match> + <!-- TEZ-1981 --> <Match> <Class name="org.apache.tez.dag.app.dag.TaskAttempt$TaskAttemptStatus"/> http://git-wip-us.apache.org/repos/asf/tez/blob/92b20cc2/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java index bd4ac2f..6dfc7b9 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java @@ -134,9 +134,9 @@ public class YarnTaskSchedulerService extends TaskScheduler AtomicBoolean isStopStarted = new AtomicBoolean(false); - private ContainerAssigner NODE_LOCAL_ASSIGNER = new NodeLocalContainerAssigner(); - private ContainerAssigner RACK_LOCAL_ASSIGNER = new RackLocalContainerAssigner(); - private ContainerAssigner NON_LOCAL_ASSIGNER = new NonLocalContainerAssigner(); + private ContainerAssigner NODE_LOCAL_ASSIGNER; + private ContainerAssigner RACK_LOCAL_ASSIGNER; + private ContainerAssigner NON_LOCAL_ASSIGNER; DelayedContainerManager delayedContainerManager; long localitySchedulingDelay; @@ -339,6 +339,9 @@ public class YarnTaskSchedulerService extends TaskScheduler Preconditions.checkArgument(preemptionMaxWaitTime >=0, "Preemption max wait time must be >=0"); delayedContainerManager = new DelayedContainerManager(); + NODE_LOCAL_ASSIGNER = new NodeLocalContainerAssigner(); + RACK_LOCAL_ASSIGNER = new RackLocalContainerAssigner(); + NON_LOCAL_ASSIGNER = new NonLocalContainerAssigner(); LOG.info("YarnTaskScheduler initialized with configuration: " + "maxRMHeartbeatInterval: " + heartbeatIntervalMax + ", containerReuseEnabled: " + shouldReuseContainers + @@ -674,7 +677,7 @@ public class YarnTaskSchedulerService extends TaskScheduler heldContainer.resetLocalityMatchLevel(); delayedContainerManager.addDelayedContainer( heldContainer.getContainer(), currentTime - + localitySchedulingDelay); + + localitySchedulingDelay); } } else if (state.equals(AMState.RUNNING_APP)) { // clear min held containers since we need to allocate to tasks @@ -2130,6 +2133,11 @@ public class YarnTaskSchedulerService extends TaskScheduler boolean added = false; synchronized(this) { added = delayedContainers.offer(delayedContainer); + if (drainedDelayedContainersForTest != null) { + synchronized (drainedDelayedContainersForTest) { + drainedDelayedContainersForTest.set(false); + } + } this.notify(); } if (!added) { http://git-wip-us.apache.org/repos/asf/tez/blob/92b20cc2/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java index a45f092..f21de3e 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java @@ -191,17 +191,12 @@ public class TestContainerReuse { createLaunchRequestEvent(taID31, ta31, resource, host1, defaultRack, priority); - drainNotifier.set(false); taskSchedulerManager.handleEvent(lrTa11); - TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); - drainNotifier.set(false); taskSchedulerManager.handleEvent(lrTa21); - TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); Container containerHost1 = createContainer(1, host1[0], resource, priority); Container containerHost2 = createContainer(2, host2[0], resource, priority); - drainNotifier.set(false); taskScheduler.onContainersAllocated( Lists.newArrayList(containerHost1, containerHost2)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); @@ -320,17 +315,12 @@ public class TestContainerReuse { AMSchedulerEventTALaunchRequest lrTa31 = createLaunchRequestEvent( taID31, ta31, resource, host1, defaultRack, priority); - drainNotifier.set(false); taskSchedulerManager.handleEvent(lrTa11); - TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); - drainNotifier.set(false); taskSchedulerManager.handleEvent(lrTa21); - TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); Container containerHost1 = createContainer(1, host1[0], resource, priority); Container containerHost2 = createContainer(2, host2[0], resource, priority); - drainNotifier.set(false); taskScheduler.onContainersAllocated(Lists.newArrayList(containerHost1, containerHost2)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); @@ -432,7 +422,6 @@ public class TestContainerReuse { Container container1 = createContainer(1, "host1", resource1, priority1); // One container allocated. - drainNotifier.set(false); taskScheduler.onContainersAllocated(Collections.singletonList(container1)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); @@ -478,7 +467,6 @@ public class TestContainerReuse { Container container2 = createContainer(2, "host2", resource1, priority1); // Second container allocated. Should be allocated to the last task. - drainNotifier.set(false); taskScheduler.onContainersAllocated(Collections.singletonList(container2)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); @@ -573,17 +561,12 @@ public class TestContainerReuse { AMSchedulerEventTALaunchRequest lrEvent2 = createLaunchRequestEvent(taID12, ta12, resource1, host1, racks, priority1); - drainNotifier.set(false); taskSchedulerManager.handleEvent(lrEvent1); - TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); - drainNotifier.set(false); taskSchedulerManager.handleEvent(lrEvent2); - TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); Container container1 = createContainer(1, "host1", resource1, priority1); // One container allocated. - drainNotifier.set(false); taskScheduler.onContainersAllocated(Collections.singletonList(container1)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); @@ -622,16 +605,11 @@ public class TestContainerReuse { createLaunchRequestEvent(taID14, ta14, resource1, host2, racks, priority1, localResources, tsLaunchCmdOpts); - Container container2 = createContainer(2, "host2", resource1, priority1); - drainNotifier.set(false); taskSchedulerManager.handleEvent(lrEvent3); - TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); - drainNotifier.set(false); taskSchedulerManager.handleEvent(lrEvent4); - TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); // Container started - drainNotifier.set(false); + Container container2 = createContainer(2, "host2", resource1, priority1); taskScheduler.onContainersAllocated(Collections.singletonList(container2)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); @@ -675,11 +653,10 @@ public class TestContainerReuse { priority1, localResources, taskSpecificLaunchCmdOption.getTaskSpecificOption("", "v1", 6)); // Container started - Container container3 = createContainer(2, "host3", resource1, priority1); + Container container3 = createContainer(3, "host3", resource1, priority1); taskSchedulerManager.handleEvent(lrEvent5); taskSchedulerManager.handleEvent(lrEvent6); - drainNotifier.set(false); taskScheduler.onContainersAllocated(Collections.singletonList(container3)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); @@ -772,14 +749,11 @@ public class TestContainerReuse { taID12, ta12, resource1, emptyHosts, racks, priority); // Send launch request for task 1 only, deterministic assignment to this task. - drainNotifier.set(false); taskSchedulerManager.handleEvent(lrEvent11); - TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); Container container1 = createContainer(1, "randomHost", resource1, priority); // One container allocated. - drainNotifier.set(false); taskScheduler.onContainersAllocated(Collections.singletonList(container1)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); @@ -803,8 +777,8 @@ public class TestContainerReuse { eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); eventHandler.reset(); - LOG.info("Sleeping to ensure that the scheduling loop runs"); - Thread.sleep(3000l); + TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); + drainableAppCallback.drain(); verify(taskSchedulerManager).taskAllocated( eq(0), eq(ta12), any(Object.class), eq(container1)); @@ -812,9 +786,10 @@ public class TestContainerReuse { taskSchedulerManager.handleEvent( new AMSchedulerEventTAEnded(ta12, container1.getId(), TaskAttemptState.SUCCEEDED, null, null, 0)); + + TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - LOG.info("Sleeping to ensure that the scheduling loop runs"); - Thread.sleep(3000l); + verify(rmClient).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyInvocation(AMContainerEventStopRequest.class); @@ -831,7 +806,9 @@ public class TestContainerReuse { tezConf.setLong( TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 1l); tezConf.setLong( - TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 2000l); + TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 20l); + tezConf.setLong( + TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 30l); tezConf.setInt( TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, 1); @@ -900,14 +877,11 @@ public class TestContainerReuse { taID21, ta21, resource1, host1, racks, priority2); // Send launch request for task 1 onle, deterministic assignment to this task. - drainNotifier.set(false); taskSchedulerManager.handleEvent(lrEvent11); - TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); Container container1 = createContainer(1, host1[0], resource1, priority1); // One container allocated. - drainNotifier.set(false); taskScheduler.onContainersAllocated(Collections.singletonList(container1)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); @@ -934,8 +908,9 @@ public class TestContainerReuse { TaskAttemptState.SUCCEEDED, null, null, 0)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); - LOG.info("Sleeping to ensure that the scheduling loop runs"); - Thread.sleep(3000l); + LOG.info("Sleeping to ensure that the container has been idled longer " + + "than TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS "); + Thread.sleep(50l); // container should not get released due to min held containers verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); @@ -1014,17 +989,12 @@ public class TestContainerReuse { TaskAttempt ta112 = mock(TaskAttempt.class); AMSchedulerEventTALaunchRequest lrEvent12 = createLaunchRequestEvent(taID112, ta112, resource1, host1, racks, priority1, dag1LRs); - drainNotifier.set(false); taskSchedulerManager.handleEvent(lrEvent11); - TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); - drainNotifier.set(false); taskSchedulerManager.handleEvent(lrEvent12); - TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); Container container1 = createContainer(1, "host1", resource1, priority1); // One container allocated. - drainNotifier.set(false); taskScheduler.onContainersAllocated(Collections.singletonList(container1)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); @@ -1078,11 +1048,9 @@ public class TestContainerReuse { taskSchedulerManager.handleEvent(lrEvent21); taskSchedulerManager.handleEvent(lrEvent22); - drainableAppCallback.drain(); - // TODO This is terrible, need a better way to ensure the scheduling loop has run - LOG.info("Sleeping to ensure that the scheduling loop runs"); - Thread.sleep(6000l); + TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); + drainableAppCallback.drain(); verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta211), any(Object.class), eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); @@ -1204,18 +1172,13 @@ public class TestContainerReuse { taID114, ta114, resource1, host1, racks, priority1, new HashMap<String, LocalResource>()); - drainNotifier.set(false); taskSchedulerManager.handleEvent(lrEvent11); - TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); - drainNotifier.set(false); taskSchedulerManager.handleEvent(lrEvent12); - TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); Container container1 = createContainer(1, "host1", resource1, priority1); Container container2 = createContainer(2, "host1", resource1, priority1); // One container allocated. - drainNotifier.set(false); taskScheduler.onContainersAllocated(Collections.singletonList(container1)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); @@ -1247,10 +1210,9 @@ public class TestContainerReuse { eventHandler.reset(); // Task 3 - drainNotifier.set(false); taskSchedulerManager.handleEvent(lrEvent13); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); - + drainableAppCallback.drain(); verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta113), any(Object.class), eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); @@ -1265,10 +1227,9 @@ public class TestContainerReuse { eventHandler.reset(); // Task 4 - drainNotifier.set(false); taskSchedulerManager.handleEvent(lrEvent14); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); - + drainableAppCallback.drain(); verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta114), any(Object.class), eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); @@ -1302,16 +1263,15 @@ public class TestContainerReuse { host1, racks, priority1, v21LR); taskSchedulerManager.handleEvent(lrEvent21); - drainableAppCallback.drain(); - // TODO This is terrible, need a better way to ensure the scheduling loop has run - LOG.info("Sleeping to ensure that the scheduling loop runs"); - Thread.sleep(6000l); - drainNotifier.set(false); - taskScheduler.onContainersAllocated(Collections.singletonList(container2)); + TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); + drainableAppCallback.drain(); - Thread.sleep(6000l); verify(rmClient, times(1)).releaseAssignedContainer(eq(container1.getId())); + + taskScheduler.onContainersAllocated(Collections.singletonList(container2)); + TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); + drainableAppCallback.drain(); verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta211), any(Object.class), eq(container2)); eventHandler.reset(); @@ -1364,7 +1324,6 @@ public class TestContainerReuse { Resource resource1 = Resource.newInstance(1024, 1); String[] host1 = {"host1"}; - String[] host2 = {"host2"}; String []racks = {"/default-rack"}; Priority priority1 = Priority.newInstance(1); @@ -1381,7 +1340,6 @@ public class TestContainerReuse { Container container1 = createContainer(1, "host1", resource1, priority1); // One container allocated. - drainNotifier.set(false); taskScheduler.onContainersAllocated(Collections.singletonList(container1)); drainableAppCallback.drain(); verify(taskSchedulerManager, times(0)).taskAllocated(eq(0), eq(ta11), http://git-wip-us.apache.org/repos/asf/tez/blob/92b20cc2/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java index ab85751..d8170e3 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java @@ -29,11 +29,12 @@ import javax.annotation.Nullable; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collection; -import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; @@ -47,7 +48,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -163,7 +163,7 @@ class TestTaskSchedulerHelpers { TaskSchedulerContextDrainable drainable = new TaskSchedulerContextDrainable(wrapper); taskSchedulers[0] = new TaskSchedulerWrapper( - new TaskSchedulerWithDrainableContext(drainable, amrmClientAsync)); + spy(new TaskSchedulerWithDrainableContext(drainable, amrmClientAsync))); taskSchedulerServiceWrappers[0] = new ServicePluginLifecycleAbstractService(taskSchedulers[0].getTaskScheduler()); } @@ -176,11 +176,8 @@ class TestTaskSchedulerHelpers { public void serviceStart() { instantiateSchedulers("host", 0, "", appContext); // Init the service so that reuse configuration is picked up. - ((AbstractService)taskSchedulerServiceWrappers[0]).init(getConfig()); - ((AbstractService)taskSchedulerServiceWrappers[0]).start(); - // For some reason, the spy needs to be setup after sertvice startup. - taskSchedulers[0] = new TaskSchedulerWrapper(spy(taskSchedulers[0].getTaskScheduler())); - + taskSchedulerServiceWrappers[0].init(getConfig()); + taskSchedulerServiceWrappers[0].start(); } @Override @@ -191,8 +188,7 @@ class TestTaskSchedulerHelpers { @SuppressWarnings("rawtypes") static class CapturingEventHandler implements EventHandler { - private List<Event> events = new LinkedList<Event>(); - + private Queue<Event> events = new ConcurrentLinkedQueue<Event>(); public void handle(Event event) { events.add(event);
