Block earlier in BoundedReadEvaluatorFactoryTest This ensures that the reader doesn't claim the split point, which in turn ensures the dynamic split request will not be refused by the OffsetBasedSource. If the split is refused, ...ProducesDynamicSplits flakes, as if the reader is faster than the split thread it can run past the point at which the splitter thread will attempt to split the source, which causes the reader to read all of the elements.
Sleep within TestReader#advanceImpl if the reader is being dynamically split, to ensure that the dynamic split fully completes before continuing a call to advance. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4a8d32e5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4a8d32e5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4a8d32e5 Branch: refs/heads/gearpump-runner Commit: 4a8d32e5d3726b851329d507a8d0392cc03f6e85 Parents: 1543ea9 Author: Thomas Groh <[email protected]> Authored: Thu Nov 17 10:56:49 2016 -0800 Committer: Thomas Groh <[email protected]> Committed: Thu Nov 17 14:37:47 2016 -0800 ---------------------------------------------------------------------- .../direct/BoundedReadEvaluatorFactoryTest.java | 26 +++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4a8d32e5/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java index 9d8503a..e956c34 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java @@ -49,6 +49,7 @@ import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.OffsetBasedSource; import org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader; import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.SourceTestUtils; @@ -142,9 +143,7 @@ public class BoundedReadEvaluatorFactoryTest { TestPipeline.create().apply(Read.from(new TestSource<>(VarLongCoder.of(), 5, elems))); AppliedPTransform<?, ?, ?> transform = read.getProducingTransformInternal(); Collection<CommittedBundle<?>> unreadInputs = - new BoundedReadEvaluatorFactory.InputProvider(context) - .getInitialInputs(transform, - 1); + new BoundedReadEvaluatorFactory.InputProvider(context).getInitialInputs(transform, 1); Collection<WindowedValue<?>> outputs = new ArrayList<>(); int numIterations = 0; @@ -155,8 +154,7 @@ public class BoundedReadEvaluatorFactoryTest { Collection<CommittedBundle<?>> newUnreadInputs = new ArrayList<>(); for (CommittedBundle<?> shardBundle : unreadInputs) { - TransformEvaluator<?> evaluator = - factory.forApplication(transform, null); + TransformEvaluator<?> evaluator = factory.forApplication(transform, null); for (WindowedValue<?> shard : shardBundle.getElements()) { evaluator.processElement((WindowedValue) shard); } @@ -178,8 +176,6 @@ public class BoundedReadEvaluatorFactoryTest { unreadInputs = newUnreadInputs; } - // We produced at least one split before we read 1000 elements, as we will attempt to split as - // quickly as possible. assertThat(numIterations, greaterThan(1)); WindowedValue[] expectedValues = new WindowedValue[numElements]; for (long i = 0L; i < numElements; i++) { @@ -343,7 +339,7 @@ public class BoundedReadEvaluatorFactoryTest { private static boolean readerClosed; private final Coder<T> coder; private final T[] elems; - private final int awaitSplitIndex; + private final int firstSplitIndex; private transient CountDownLatch subrangesCompleted; @@ -351,11 +347,11 @@ public class BoundedReadEvaluatorFactoryTest { this(coder, elems.length, elems); } - public TestSource(Coder<T> coder, int awaitSplitIndex, T... elems) { + public TestSource(Coder<T> coder, int firstSplitIndex, T... elems) { super(0L, elems.length, 1L); this.elems = elems; this.coder = coder; - this.awaitSplitIndex = awaitSplitIndex; + this.firstSplitIndex = firstSplitIndex; readerClosed = false; subrangesCompleted = new CountDownLatch(2); @@ -380,7 +376,7 @@ public class BoundedReadEvaluatorFactoryTest { @Override public BoundedSource.BoundedReader<T> createReader(PipelineOptions options) throws IOException { subrangesCompleted = new CountDownLatch(2); - return new TestReader<>(this, awaitSplitIndex, subrangesCompleted); + return new TestReader<>(this, firstSplitIndex, subrangesCompleted); } @Override @@ -405,6 +401,7 @@ public class BoundedReadEvaluatorFactoryTest { } private static class TestReader<T> extends OffsetBasedReader<T> { + private final Source<T> initialSource; private final int sleepIndex; private final CountDownLatch dynamicallySplit; @@ -412,6 +409,7 @@ public class BoundedReadEvaluatorFactoryTest { TestReader(OffsetBasedSource<T> source, int sleepIndex, CountDownLatch dynamicallySplit) { super(source); + this.initialSource = source; this.sleepIndex = sleepIndex; this.dynamicallySplit = dynamicallySplit; this.index = -1; @@ -434,9 +432,13 @@ public class BoundedReadEvaluatorFactoryTest { @Override public boolean advanceImpl() throws IOException { - if (index == sleepIndex) { + // Sleep before the sleep/split index is claimed so long as it will be claimed + if (index + 1 == sleepIndex && sleepIndex < getCurrentSource().elems.length) { try { dynamicallySplit.await(); + while (initialSource.equals(getCurrentSource())) { + // Spin until the current source is updated + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException(e);
