Be more accepting in UnboundedReadDeduplicatorTest Don't depend on all the threads failing. Instead, assert that at most one success was encountered, and we saw at most numThreads - 1 failures.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c8052b6e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c8052b6e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c8052b6e Branch: refs/heads/gearpump-runner Commit: c8052b6e1b65e37341b41e827f95b31e0df0be99 Parents: 958f3fe Author: Thomas Groh <tg...@google.com> Authored: Fri Sep 2 10:43:43 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Mon Sep 12 17:40:13 2016 -0700 ---------------------------------------------------------------------- .../runners/direct/UnboundedReadDeduplicatorTest.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c8052b6e/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadDeduplicatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadDeduplicatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadDeduplicatorTest.java index 7d2a95c..0aa2c49 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadDeduplicatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadDeduplicatorTest.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.direct; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; import static org.junit.Assert.assertThat; import java.util.concurrent.CountDownLatch; @@ -60,18 +61,18 @@ public class UnboundedReadDeduplicatorTest { byte[] id = new byte[] {-1, 2, 4, 22}; UnboundedReadDeduplicator dedupper = CachedIdDeduplicator.create(); final CountDownLatch startSignal = new CountDownLatch(1); - int numThreads = 1000; + int numThreads = 50; final CountDownLatch readyLatch = new CountDownLatch(numThreads); final CountDownLatch finishLine = new CountDownLatch(numThreads); ExecutorService executor = Executors.newCachedThreadPool(); AtomicInteger successCount = new AtomicInteger(); - AtomicInteger failureCount = new AtomicInteger(); + AtomicInteger noOutputCount = new AtomicInteger(); for (int i = 0; i < numThreads; i++) { executor.submit(new TryOutputIdRunnable(dedupper, id, successCount, - failureCount, + noOutputCount, readyLatch, startSignal, finishLine)); @@ -82,8 +83,10 @@ public class UnboundedReadDeduplicatorTest { finishLine.await(10L, TimeUnit.SECONDS); executor.shutdownNow(); + // The first thread to run will succeed, and no others will assertThat(successCount.get(), equalTo(1)); - assertThat(failureCount.get(), equalTo(numThreads - 1)); + // The threads may not all complete; all of the threads that do not succeed must not output + assertThat(noOutputCount.get(), lessThan(numThreads)); } private static class TryOutputIdRunnable implements Runnable {