Repository: incubator-beam Updated Branches: refs/heads/master d39346823 -> 5b5c0e28f
Close Readers in InProcess Read Evaluators The readers were formerly left open, which prevents release of any resources that should be released. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fad6da89 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fad6da89 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fad6da89 Branch: refs/heads/master Commit: fad6da89079791952a937aed257f0d2db1467053 Parents: d393468 Author: Thomas Groh <tg...@google.com> Authored: Tue Mar 15 11:50:38 2016 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Thu Mar 17 21:01:18 2016 -0700 ---------------------------------------------------------------------- .../inprocess/BoundedReadEvaluatorFactory.java | 49 ++++-- .../UnboundedReadEvaluatorFactory.java | 53 +++--- .../BoundedReadEvaluatorFactoryTest.java | 136 ++++++++++++++- .../UnboundedReadEvaluatorFactoryTest.java | 168 +++++++++++++++++++ 4 files changed, 366 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fad6da89/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java index 2a164c3..eaea3ed 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java @@ -15,6 +15,8 @@ */ package com.google.cloud.dataflow.sdk.runners.inprocess; +import com.google.cloud.dataflow.sdk.io.BoundedSource; +import com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader; import com.google.cloud.dataflow.sdk.io.Read.Bounded; import com.google.cloud.dataflow.sdk.io.Source.Reader; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; @@ -78,8 +80,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory { @SuppressWarnings("unchecked") private <OutputT> Queue<BoundedReadEvaluator<OutputT>> getTransformEvaluatorQueue( final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform, - final InProcessEvaluationContext evaluationContext) - throws IOException { + final InProcessEvaluationContext evaluationContext) { // Key by the application and the context the evaluation is occurring in (which call to // Pipeline#run). EvaluatorKey key = new EvaluatorKey(transform, evaluationContext); @@ -101,21 +102,25 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory { return evaluatorQueue; } + /** + * A {@link BoundedReadEvaluator} produces elements from an underlying {@link BoundedSource}, + * discarding all input elements. Within the call to {@link #finishBundle()}, the evaluator + * creates the {@link BoundedReader} and consumes all available input. + * + * <p>A {@link BoundedReadEvaluator} should only be created once per {@link BoundedSource}, and + * each evaluator should only be called once per evaluation of the pipeline. Otherwise, the source + * may produce duplicate elements. + */ private static class BoundedReadEvaluator<OutputT> implements TransformEvaluator<Object> { private final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform; private final InProcessEvaluationContext evaluationContext; - private final Reader<OutputT> reader; private boolean contentsRemaining; public BoundedReadEvaluator( AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform, - InProcessEvaluationContext evaluationContext) - throws IOException { + InProcessEvaluationContext evaluationContext) { this.transform = transform; this.evaluationContext = evaluationContext; - reader = - transform.getTransform().getSource().createReader(evaluationContext.getPipelineOptions()); - contentsRemaining = reader.start(); } @Override @@ -123,17 +128,25 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory { @Override public InProcessTransformResult finishBundle() throws IOException { - UncommittedBundle<OutputT> output = evaluationContext.createRootBundle(transform.getOutput()); - while (contentsRemaining) { - output.add( - WindowedValue.timestampedValueInGlobalWindow( - reader.getCurrent(), reader.getCurrentTimestamp())); - contentsRemaining = reader.advance(); + try (final Reader<OutputT> reader = + transform + .getTransform() + .getSource() + .createReader(evaluationContext.getPipelineOptions());) { + contentsRemaining = reader.start(); + UncommittedBundle<OutputT> output = + evaluationContext.createRootBundle(transform.getOutput()); + while (contentsRemaining) { + output.add( + WindowedValue.timestampedValueInGlobalWindow( + reader.getCurrent(), reader.getCurrentTimestamp())); + contentsRemaining = reader.advance(); + } + reader.close(); + return StepTransformResult.withHold(transform, BoundedWindow.TIMESTAMP_MAX_VALUE) + .addOutput(output) + .build(); } - return StepTransformResult - .withHold(transform, BoundedWindow.TIMESTAMP_MAX_VALUE) - .addOutput(output) - .build(); } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fad6da89/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java index 97f0e25..549afab 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java @@ -99,6 +99,16 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { return evaluatorQueue; } + /** + * A {@link UnboundedReadEvaluator} produces elements from an underlying {@link UnboundedSource}, + * discarding all input elements. Within the call to {@link #finishBundle()}, the evaluator + * creates the {@link UnboundedReader} and consumes some currently available input. + * + * <p>Calls to {@link UnboundedReadEvaluator} are not internally thread-safe, and should only be + * used by a single thread at a time. Each {@link UnboundedReadEvaluator} maintains its own + * checkpoint, and constructs its reader from the current checkpoint in each call to + * {@link #finishBundle()}. + */ private static class UnboundedReadEvaluator<OutputT> implements TransformEvaluator<Object> { private static final int ARBITRARY_MAX_ELEMENTS = 10; private final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform; @@ -122,28 +132,29 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { @Override public InProcessTransformResult finishBundle() throws IOException { UncommittedBundle<OutputT> output = evaluationContext.createRootBundle(transform.getOutput()); - UnboundedReader<OutputT> reader = - createReader( - transform.getTransform().getSource(), evaluationContext.getPipelineOptions()); - int numElements = 0; - if (reader.start()) { - do { - output.add( - WindowedValue.timestampedValueInGlobalWindow( - reader.getCurrent(), reader.getCurrentTimestamp())); - numElements++; - } while (numElements < ARBITRARY_MAX_ELEMENTS && reader.advance()); + try (UnboundedReader<OutputT> reader = + createReader( + transform.getTransform().getSource(), evaluationContext.getPipelineOptions());) { + int numElements = 0; + if (reader.start()) { + do { + output.add( + WindowedValue.timestampedValueInGlobalWindow( + reader.getCurrent(), reader.getCurrentTimestamp())); + numElements++; + } while (numElements < ARBITRARY_MAX_ELEMENTS && reader.advance()); + } + checkpointMark = reader.getCheckpointMark(); + checkpointMark.finalizeCheckpoint(); + // TODO: When exercising create initial splits, make this the minimum watermark across all + // existing readers + StepTransformResult result = + StepTransformResult.withHold(transform, reader.getWatermark()) + .addOutput(output) + .build(); + evaluatorQueue.offer(this); + return result; } - checkpointMark = reader.getCheckpointMark(); - checkpointMark.finalizeCheckpoint(); - // TODO: When exercising create initial splits, make this the minimum watermark across all - // existing readers - StepTransformResult result = - StepTransformResult.withHold(transform, reader.getWatermark()) - .addOutput(output) - .build(); - evaluatorQueue.offer(this); - return result; } private <CheckpointMarkT extends CheckpointMark> UnboundedReader<OutputT> createReader( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fad6da89/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java index 4395514..e641dd6 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java @@ -18,24 +18,39 @@ package com.google.cloud.dataflow.sdk.runners.inprocess; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder; +import com.google.cloud.dataflow.sdk.coders.Coder; import com.google.cloud.dataflow.sdk.io.BoundedSource; +import com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader; import com.google.cloud.dataflow.sdk.io.CountingSource; import com.google.cloud.dataflow.sdk.io.Read; import com.google.cloud.dataflow.sdk.io.Read.Bounded; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.collect.ImmutableList; +import org.joda.time.Instant; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.Mock; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.NoSuchElementException; /** * Tests for {@link BoundedReadEvaluatorFactory}. @@ -45,7 +60,7 @@ public class BoundedReadEvaluatorFactoryTest { private BoundedSource<Long> source; private PCollection<Long> longs; private TransformEvaluatorFactory factory; - private InProcessEvaluationContext context; + @Mock private InProcessEvaluationContext context; @Before public void setup() { @@ -146,6 +161,125 @@ public class BoundedReadEvaluatorFactoryTest { gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L))); } + @Test + public void boundedSourceEvaluatorClosesReader() throws Exception { + TestSource<Long> source = new TestSource<>(BigEndianLongCoder.of(), 1L, 2L, 3L); + + TestPipeline p = TestPipeline.create(); + PCollection<Long> pcollection = p.apply(Read.from(source)); + AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal(); + + UncommittedBundle<Long> output = InProcessBundle.unkeyed(longs); + when(context.createRootBundle(pcollection)).thenReturn(output); + + TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context); + evaluator.finishBundle(); + CommittedBundle<Long> committed = output.commit(Instant.now()); + assertThat(committed.getElements(), containsInAnyOrder(gw(2L), gw(3L), gw(1L))); + assertThat(TestSource.readerClosed, is(true)); + } + + @Test + public void boundedSourceEvaluatorNoElementsClosesReader() throws Exception { + TestSource<Long> source = new TestSource<>(BigEndianLongCoder.of()); + + TestPipeline p = TestPipeline.create(); + PCollection<Long> pcollection = p.apply(Read.from(source)); + AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal(); + + UncommittedBundle<Long> output = InProcessBundle.unkeyed(longs); + when(context.createRootBundle(pcollection)).thenReturn(output); + + TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context); + evaluator.finishBundle(); + CommittedBundle<Long> committed = output.commit(Instant.now()); + assertThat(committed.getElements(), emptyIterable()); + assertThat(TestSource.readerClosed, is(true)); + } + + private static class TestSource<T> extends BoundedSource<T> { + private static boolean readerClosed; + private final Coder<T> coder; + private final T[] elems; + + public TestSource(Coder<T> coder, T... elems) { + this.elems = elems; + this.coder = coder; + readerClosed = false; + } + + @Override + public List<? extends BoundedSource<T>> splitIntoBundles( + long desiredBundleSizeBytes, PipelineOptions options) throws Exception { + return ImmutableList.of(this); + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { + return 0; + } + + @Override + public boolean producesSortedKeys(PipelineOptions options) throws Exception { + return false; + } + + @Override + public BoundedSource.BoundedReader<T> createReader(PipelineOptions options) throws IOException { + return new TestReader<>(this, elems); + } + + @Override + public void validate() { + } + + @Override + public Coder<T> getDefaultOutputCoder() { + return coder; + } + } + + private static class TestReader<T> extends BoundedReader<T> { + private final BoundedSource<T> source; + private final List<T> elems; + private int index; + + public TestReader(BoundedSource<T> source, T... elems) { + this.source = source; + this.elems = Arrays.asList(elems); + this.index = -1; + } + + @Override + public BoundedSource<T> getCurrentSource() { + return source; + } + + @Override + public boolean start() throws IOException { + return advance(); + } + + @Override + public boolean advance() throws IOException { + if (elems.size() > index + 1) { + index++; + return true; + } + return false; + } + + @Override + public T getCurrent() throws NoSuchElementException { + return elems.get(index); + } + + @Override + public void close() throws IOException { + TestSource.readerClosed = true; + } + } + private static WindowedValue<Long> gw(Long elem) { return WindowedValue.valueInGlobalWindow(elem); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fad6da89/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java index a9bbcc8..20a7d60 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java @@ -18,20 +18,30 @@ package com.google.cloud.dataflow.sdk.runners.inprocess; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.google.cloud.dataflow.sdk.coders.AtomicCoder; +import com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.CoderException; import com.google.cloud.dataflow.sdk.io.CountingSource; import com.google.cloud.dataflow.sdk.io.Read; import com.google.cloud.dataflow.sdk.io.UnboundedSource; +import com.google.cloud.dataflow.sdk.io.UnboundedSource.CheckpointMark; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.collect.ImmutableList; import org.hamcrest.Matchers; import org.joda.time.DateTime; @@ -41,6 +51,15 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.List; +import java.util.NoSuchElementException; + +import javax.annotation.Nullable; /** * Tests for {@link UnboundedReadEvaluatorFactory}. */ @@ -111,6 +130,41 @@ public class UnboundedReadEvaluatorFactoryTest { tgw(15L), tgw(13L), tgw(10L))); } + @Test + public void boundedSourceEvaluatorClosesReader() throws Exception { + TestUnboundedSource<Long> source = + new TestUnboundedSource<>(BigEndianLongCoder.of(), 1L, 2L, 3L); + + TestPipeline p = TestPipeline.create(); + PCollection<Long> pcollection = p.apply(Read.from(source)); + AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal(); + + when(context.createRootBundle(pcollection)).thenReturn(output); + + TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context); + evaluator.finishBundle(); + CommittedBundle<Long> committed = output.commit(Instant.now()); + assertThat(ImmutableList.copyOf(committed.getElements()), hasSize(3)); + assertThat(TestUnboundedSource.readerClosedCount, equalTo(1)); + } + + @Test + public void boundedSourceEvaluatorNoElementsClosesReader() throws Exception { + TestUnboundedSource<Long> source = new TestUnboundedSource<>(BigEndianLongCoder.of()); + + TestPipeline p = TestPipeline.create(); + PCollection<Long> pcollection = p.apply(Read.from(source)); + AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal(); + + when(context.createRootBundle(pcollection)).thenReturn(output); + + TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context); + evaluator.finishBundle(); + CommittedBundle<Long> committed = output.commit(Instant.now()); + assertThat(committed.getElements(), emptyIterable()); + assertThat(TestUnboundedSource.readerClosedCount, equalTo(1)); + } + // TODO: Once the source is split into multiple sources before evaluating, this test will have to // be updated. /** @@ -156,4 +210,118 @@ public class UnboundedReadEvaluatorFactoryTest { return new Instant(input); } } + + private static class TestUnboundedSource<T> extends UnboundedSource<T, TestCheckpointMark> { + static int readerClosedCount; + private final Coder<T> coder; + private final List<T> elems; + + public TestUnboundedSource(Coder<T> coder, T... elems) { + readerClosedCount = 0; + this.coder = coder; + this.elems = Arrays.asList(elems); + } + + @Override + public List<? extends UnboundedSource<T, TestCheckpointMark>> generateInitialSplits( + int desiredNumSplits, PipelineOptions options) throws Exception { + return ImmutableList.of(this); + } + + @Override + public UnboundedSource.UnboundedReader<T> createReader( + PipelineOptions options, TestCheckpointMark checkpointMark) { + return new TestUnboundedReader(elems); + } + + @Override + @Nullable + public Coder<TestCheckpointMark> getCheckpointMarkCoder() { + return new TestCheckpointMark.Coder(); + } + + @Override + public void validate() {} + + @Override + public Coder<T> getDefaultOutputCoder() { + return coder; + } + + private class TestUnboundedReader extends UnboundedReader<T> { + private final List<T> elems; + private int index; + + public TestUnboundedReader(List<T> elems) { + this.elems = elems; + this.index = -1; + } + + @Override + public boolean start() throws IOException { + return advance(); + } + + @Override + public boolean advance() throws IOException { + if (index + 1 < elems.size()) { + index++; + return true; + } + return false; + } + + @Override + public Instant getWatermark() { + return Instant.now(); + } + + @Override + public CheckpointMark getCheckpointMark() { + return new TestCheckpointMark(); + } + + @Override + public UnboundedSource<T, ?> getCurrentSource() { + TestUnboundedSource<T> source = TestUnboundedSource.this; + return source; + } + + @Override + public T getCurrent() throws NoSuchElementException { + return elems.get(index); + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + return Instant.now(); + } + + @Override + public void close() throws IOException { + readerClosedCount++; + } + } + } + + private static class TestCheckpointMark implements CheckpointMark { + @Override + public void finalizeCheckpoint() throws IOException {} + + public static class Coder extends AtomicCoder<TestCheckpointMark> { + @Override + public void encode( + TestCheckpointMark value, + OutputStream outStream, + com.google.cloud.dataflow.sdk.coders.Coder.Context context) + throws CoderException, IOException {} + + @Override + public TestCheckpointMark decode( + InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context) + throws CoderException, IOException { + return new TestCheckpointMark(); + } + } + } }