TEZ-2238. TestContainerReuse flaky (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/17d23888 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/17d23888 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/17d23888 Branch: refs/heads/TEZ-2003 Commit: 17d238880b610563451defb9484b891c4ea617bb Parents: a4f6dbc Author: Bikas Saha <[email protected]> Authored: Fri Mar 27 01:35:17 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Fri Mar 27 01:35:17 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../dag/app/rm/YarnTaskSchedulerService.java | 2 +- .../tez/dag/app/rm/TestContainerReuse.java | 35 ++++++++++++++++++++ .../dag/app/rm/TestTaskSchedulerHelpers.java | 1 - 4 files changed, 37 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/17d23888/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index bd7f337..91653bf 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -253,6 +253,7 @@ TEZ-UI CHANGES (TEZ-8): Release 0.5.4: Unreleased ALL CHANGES: + TEZ-2238. TestContainerReuse flaky TEZ-2217. The min-held-containers being released prematurely TEZ-2214. FetcherOrderedGrouped can get stuck indefinitely when MergeManager misses memToDiskMerging TEZ-1923. FetcherOrderedGrouped gets into infinite loop due to memory pressure http://git-wip-us.apache.org/repos/asf/tez/blob/17d23888/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java index 6c31349..ff90e5d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java @@ -1831,8 +1831,8 @@ public class YarnTaskSchedulerService extends TaskSchedulerService try { // test only signaling to make TestTaskScheduler work if (drainedDelayedContainersForTest != null) { - drainedDelayedContainersForTest.set(true); synchronized (drainedDelayedContainersForTest) { + drainedDelayedContainersForTest.set(true); drainedDelayedContainersForTest.notifyAll(); } } http://git-wip-us.apache.org/repos/asf/tez/blob/17d23888/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java index 8fde49b..c70003b 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java @@ -110,6 +110,7 @@ public class TestContainerReuse { @Test(timeout = 15000l) public void testDelayedReuseContainerBecomesAvailable() throws IOException, InterruptedException, ExecutionException { + LOG.info("Test testDelayedReuseContainerBecomesAvailable"); Configuration conf = new Configuration(new YarnConfiguration()); conf.setBoolean( TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true); @@ -120,6 +121,7 @@ public class TestContainerReuse { conf.setLong( TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 3000l); conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0); + conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0); TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class); CapturingEventHandler eventHandler = new CapturingEventHandler(); @@ -195,8 +197,12 @@ public class TestContainerReuse { createLaunchRequestEvent(taID31, ta31, resource, host1, defaultRack, priority); + drainNotifier.set(false); taskSchedulerEventHandler.handleEvent(lrTa11); + TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); + drainNotifier.set(false); taskSchedulerEventHandler.handleEvent(lrTa21); + TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); Container containerHost1 = createContainer(1, host1[0], resource, priority); Container containerHost2 = createContainer(2, host2[0], resource, priority); @@ -252,12 +258,14 @@ public class TestContainerReuse { @Test(timeout = 15000l) public void testDelayedReuseContainerNotAvailable() throws IOException, InterruptedException, ExecutionException { + LOG.info("Test testDelayedReuseContainerNotAvailable"); Configuration conf = new Configuration(new YarnConfiguration()); conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true); conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, false); conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED, false); conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 1000l); conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0); + conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0); TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class); CapturingEventHandler eventHandler = new CapturingEventHandler(); @@ -326,8 +334,12 @@ public class TestContainerReuse { AMSchedulerEventTALaunchRequest lrTa31 = createLaunchRequestEvent( taID31, ta31, resource, host1, defaultRack, priority); + drainNotifier.set(false); taskSchedulerEventHandler.handleEvent(lrTa11); + TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); + drainNotifier.set(false); taskSchedulerEventHandler.handleEvent(lrTa21); + TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); Container containerHost1 = createContainer(1, host1[0], resource, priority); Container containerHost2 = createContainer(2, host2[0], resource, priority); @@ -360,11 +372,13 @@ public class TestContainerReuse { @Test(timeout = 10000l) public void testSimpleReuse() throws IOException, InterruptedException, ExecutionException { + LOG.info("Test testSimpleReuse"); Configuration tezConf = new Configuration(new YarnConfiguration()); tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true); tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, true); tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0); tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0); + tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0); TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class); CapturingEventHandler eventHandler = new CapturingEventHandler(); @@ -495,11 +509,13 @@ public class TestContainerReuse { @Test(timeout = 10000l) public void testReuseWithTaskSpecificLaunchCmdOption() throws IOException, InterruptedException, ExecutionException { + LOG.info("Test testReuseWithTaskSpecificLaunchCmdOption"); Configuration tezConf = new Configuration(new YarnConfiguration()); tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true); tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, true); tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0); tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0); + tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0); //Profile 3 tasks tezConf.set(TezConfiguration.TEZ_TASK_SPECIFIC_LAUNCH_CMD_OPTS_LIST, "v1[1,3,4]"); tezConf.set(TezConfiguration.TEZ_TASK_SPECIFIC_LAUNCH_CMD_OPTS, "dir=/tmp/__VERTEX_NAME__/__TASK_INDEX__"); @@ -573,8 +589,12 @@ public class TestContainerReuse { AMSchedulerEventTALaunchRequest lrEvent2 = createLaunchRequestEvent(taID12, ta12, resource1, host1, racks, priority1); + drainNotifier.set(false); taskSchedulerEventHandler.handleEvent(lrEvent1); + TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); + drainNotifier.set(false); taskSchedulerEventHandler.handleEvent(lrEvent2); + TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); Container container1 = createContainer(1, "host1", resource1, priority1); @@ -617,8 +637,12 @@ public class TestContainerReuse { priority1, localResources, tsLaunchCmdOpts); Container container2 = createContainer(2, "host2", resource1, priority1); + drainNotifier.set(false); taskSchedulerEventHandler.handleEvent(lrEvent3); + TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); + drainNotifier.set(false); taskSchedulerEventHandler.handleEvent(lrEvent4); + TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); // Container started drainNotifier.set(false); @@ -687,6 +711,7 @@ public class TestContainerReuse { @Test(timeout = 30000l) public void testReuseNonLocalRequest() throws IOException, InterruptedException, ExecutionException { + LOG.info("Test testReuseNonLocalRequest"); Configuration tezConf = new Configuration(new YarnConfiguration()); tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true); tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, true); @@ -764,7 +789,9 @@ public class TestContainerReuse { taID12, ta12, resource1, emptyHosts, racks, priority); // Send launch request for task 1 only, deterministic assignment to this task. + drainNotifier.set(false); taskSchedulerEventHandler.handleEvent(lrEvent11); + TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); Container container1 = createContainer(1, "randomHost", resource1, priority); @@ -815,6 +842,7 @@ public class TestContainerReuse { @Test(timeout = 30000l) public void testReuseAcrossVertices() throws IOException, InterruptedException, ExecutionException { + LOG.info("Test testReuseAcrossVertices"); Configuration tezConf = new Configuration(new YarnConfiguration()); tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true); tezConf.setLong( @@ -897,7 +925,9 @@ public class TestContainerReuse { taID21, ta21, resource1, host1, racks, priority2); // Send launch request for task 1 onle, deterministic assignment to this task. + drainNotifier.set(false); taskSchedulerEventHandler.handleEvent(lrEvent11); + TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); Container container1 = createContainer(1, host1[0], resource1, priority1); @@ -940,6 +970,7 @@ public class TestContainerReuse { @Test(timeout = 30000l) public void testReuseLocalResourcesChanged() throws IOException, InterruptedException, ExecutionException { + LOG.info("Test testReuseLocalResourcesChanged"); Configuration tezConf = new Configuration(new YarnConfiguration()); tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true); tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, true); @@ -1015,8 +1046,12 @@ public class TestContainerReuse { TaskAttempt ta112 = mock(TaskAttempt.class); AMSchedulerEventTALaunchRequest lrEvent12 = createLaunchRequestEvent(taID112, ta112, resource1, host1, racks, priority1, dag1LRs); + drainNotifier.set(false); taskSchedulerEventHandler.handleEvent(lrEvent11); + TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); + drainNotifier.set(false); taskSchedulerEventHandler.handleEvent(lrEvent12); + TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); Container container1 = createContainer(1, "host1", resource1, priority1); http://git-wip-us.apache.org/repos/asf/tez/blob/17d23888/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java index bec5320..bb44889 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java @@ -319,7 +319,6 @@ class TestTaskSchedulerHelpers { } else { fail("Timed out while trying to drain queue"); } - } } }
