Repository: tez Updated Branches: refs/heads/master ad8a80d2b -> db6f05f7f
TEZ-3695. TestTezSharedExecutor fails sporadically. (jlowe) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/db6f05f7 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/db6f05f7 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/db6f05f7 Branch: refs/heads/master Commit: db6f05f7fc7512d37e835f785fc8995b16d4c63b Parents: ad8a80d Author: Jason Lowe <[email protected]> Authored: Tue Apr 25 09:55:34 2017 -0500 Committer: Jason Lowe <[email protected]> Committed: Tue Apr 25 09:55:34 2017 -0500 ---------------------------------------------------------------------- .../tez/common/TestTezSharedExecutor.java | 52 +++++++------------- 1 file changed, 18 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/db6f05f7/tez-common/src/test/java/org/apache/tez/common/TestTezSharedExecutor.java ---------------------------------------------------------------------- diff --git a/tez-common/src/test/java/org/apache/tez/common/TestTezSharedExecutor.java b/tez-common/src/test/java/org/apache/tez/common/TestTezSharedExecutor.java index 8d87846..9ea07ef 100644 --- a/tez-common/src/test/java/org/apache/tez/common/TestTezSharedExecutor.java +++ b/tez-common/src/test/java/org/apache/tez/common/TestTezSharedExecutor.java @@ -23,10 +23,9 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Random; -import java.util.Timer; -import java.util.TimerTask; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; @@ -57,16 +56,14 @@ public class TestTezSharedExecutor { } private static class Wait implements Runnable { - private final Object ref; - Wait(Object ref) { - this.ref = ref == null ? this : ref; + private final CountDownLatch latch; + Wait(CountDownLatch latch) { + this.latch = latch; } @Override public void run() { try { - synchronized (ref) { - ref.wait(); - } + latch.await(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } @@ -113,12 +110,6 @@ public class TestTezSharedExecutor { } } - private void _notify(Object obj) { - synchronized (obj) { - obj.notify(); - } - } - private TezSharedExecutor sharedExecutor; @Before @@ -132,7 +123,7 @@ public class TestTezSharedExecutor { sharedExecutor = null; } - @Test(timeout=2000) + @Test(timeout=10000) public void testSimpleExecution() throws Exception { ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<>(); @@ -167,42 +158,35 @@ public class TestTezSharedExecutor { } } - @Test(timeout=5000) + @Test(timeout=10000) public void testAwaitTermination() throws Exception { ExecutorService service = sharedExecutor.createExecutorService(1, "await-termination"); + CountDownLatch latch = new CountDownLatch(1); - final Runnable runnable = new Wait(null); + final Runnable runnable = new Wait(latch); service.submit(runnable); service.shutdown(); - // No notify sent hence it should fail. + // Task stuck on latch hence it should fail to terminate. Assert.assertFalse(service.awaitTermination(100, TimeUnit.MILLISECONDS)); Assert.assertFalse(service.isTerminated()); Assert.assertTrue(service.isShutdown()); - Timer timer = new Timer(true); - timer.schedule(new TimerTask() { - @Override - public void run() { - _notify(runnable); - } - }, 100); + latch.countDown(); - // Highly unlikely that there are intermittent failures, but a possiblity :-(. - Assert.assertTrue(service.awaitTermination(1, TimeUnit.SECONDS)); + Assert.assertTrue(service.awaitTermination(5, TimeUnit.SECONDS)); Assert.assertTrue(service.isTerminated()); Assert.assertTrue(service.isShutdown()); - - timer.cancel(); } - @Test(timeout=2000) + @Test(timeout=10000) public void testSerialExecution() throws Exception { ExecutorService service = sharedExecutor.createExecutorService(1, "serial-test"); + CountDownLatch latch = new CountDownLatch(1); // Since it is serial we should never get concurrent modification exception too. + Future<?> f1 = service.submit(new Wait(latch)); List<Integer> list = new ArrayList<>(); - Future<?> f1 = service.submit(new Wait(list)); List<Future<?>> futures = new ArrayList<>(); for (int i = 0; i < 10; ++i) { futures.add(service.submit(new Appender<Integer>(list, i))); @@ -211,9 +195,9 @@ public class TestTezSharedExecutor { // This shutdown does not prevent already submitted tasks from completing. service.shutdown(); - // Until we notify nothing moves forward. + // Until we release the task from the latch nothing moves forward. Assert.assertEquals(0, list.size()); - _notify(list); + latch.countDown(); f1.get(); // Wait for all futures to finish. @@ -224,7 +208,7 @@ public class TestTezSharedExecutor { Assert.assertEquals(Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), list); } - @Test(timeout=5000) + @Test(timeout=10000) public void testParallelExecution() throws Exception { ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<>();
