Prevent Double-Close in UnboundedReadEvaluatorFactory Move the actual "close-and-resume" to the overall try block, to ensure that the reader cannot be double-closed if the first call to close() throws an IOException.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/869002c3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/869002c3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/869002c3 Branch: refs/heads/master Commit: 869002c397b3a360ab9a9afe0a342a6ac2fe7f9e Parents: 30033cc Author: Thomas Groh <[email protected]> Authored: Tue Mar 14 09:18:57 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Tue Mar 14 10:46:16 2017 -0700 ---------------------------------------------------------------------- .../direct/UnboundedReadEvaluatorFactory.java | 30 ++++++---- .../UnboundedReadEvaluatorFactoryTest.java | 61 ++++++++++++++++++-- 2 files changed, 77 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/869002c3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java index 69e6920..7c3d50a 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java @@ -139,7 +139,24 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { numElements++; } while (numElements < ARBITRARY_MAX_ELEMENTS && reader.advance()); Instant watermark = reader.getWatermark(); - UnboundedSourceShard<OutputT, CheckpointMarkT> residual = finishRead(reader, shard); + + CheckpointMarkT finishedCheckpoint = finishRead(reader, shard); + UnboundedSourceShard<OutputT, CheckpointMarkT> residual; + // Sometimes resume from a checkpoint even if it's not required + if (ThreadLocalRandom.current().nextDouble(1.0) >= readerReuseChance) { + UnboundedReader<OutputT> toClose = reader; + // Prevent double-close. UnboundedReader is AutoCloseable, which does not require + // idempotency of close. Nulling out the reader here prevents trying to re-close it + // if the call to close throws an IOException. + reader = null; + toClose.close(); + residual = + UnboundedSourceShard.of( + shard.getSource(), shard.getDeduplicator(), null, finishedCheckpoint); + } else { + residual = shard.withCheckpoint(finishedCheckpoint); + } + resultBuilder .addOutput(output) .addUnprocessedElements( @@ -192,7 +209,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { * Checkpoint the current reader, finalize the previous checkpoint, and return the residual * {@link UnboundedSourceShard}. */ - private UnboundedSourceShard<OutputT, CheckpointMarkT> finishRead( + private CheckpointMarkT finishRead( UnboundedReader<OutputT> reader, UnboundedSourceShard<OutputT, CheckpointMarkT> shard) throws IOException { final CheckpointMark oldMark = shard.getCheckpoint(); @@ -223,14 +240,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { } }); } - - // Sometimes resume from a checkpoint even if it's not required - if (ThreadLocalRandom.current().nextDouble(1.0) >= readerReuseChance) { - reader.close(); - return UnboundedSourceShard.of(shard.getSource(), shard.getDeduplicator(), null, mark); - } else { - return shard.withCheckpoint(mark); - } + return mark; } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/869002c3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java index 7e2d85d..cdb362f 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java @@ -76,6 +76,7 @@ import org.joda.time.ReadableInstant; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.invocation.InvocationOnMock; @@ -96,8 +97,8 @@ public class UnboundedReadEvaluatorFactoryTest { private UnboundedSource<Long, ?> source; private DirectGraph graph; - @Rule - public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + @Rule public ExpectedException thrown = ExpectedException.none(); + @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); @Before public void setup() { @@ -379,6 +380,38 @@ public class UnboundedReadEvaluatorFactoryTest { is(true)); } + @Test + public void evaluatorThrowsInCloseRethrows() throws Exception { + ContiguousSet<Long> elems = ContiguousSet.create(Range.closed(0L, 20L), DiscreteDomain.longs()); + TestUnboundedSource<Long> source = + new TestUnboundedSource<>(BigEndianLongCoder.of(), elems.toArray(new Long[0])) + .throwsOnClose(); + + PCollection<Long> pcollection = p.apply(Read.from(source)); + AppliedPTransform<?, ?, ?> sourceTransform = + DirectGraphs.getGraph(p).getProducer(pcollection); + + when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle()); + UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection); + when(context.createBundle(pcollection)).thenReturn(output); + + WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>> shard = + WindowedValue.valueInGlobalWindow( + UnboundedSourceShard.unstarted(source, NeverDeduplicator.create())); + CommittedBundle<UnboundedSourceShard<Long, TestCheckpointMark>> inputBundle = + bundleFactory + .<UnboundedSourceShard<Long, TestCheckpointMark>>createRootBundle() + .add(shard) + .commit(Instant.now()); + UnboundedReadEvaluatorFactory factory = + new UnboundedReadEvaluatorFactory(context, 0.0 /* never reuse */); + TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> evaluator = + factory.forApplication(sourceTransform, inputBundle); + thrown.expect(IOException.class); + thrown.expectMessage("throws on close"); + evaluator.processElement(shard); + } + /** * A terse alias for producing timestamped longs in the {@link GlobalWindow}, where * the timestamp is the epoch offset by the value of the element. @@ -402,12 +435,18 @@ public class UnboundedReadEvaluatorFactoryTest { private final Coder<T> coder; private final List<T> elems; private boolean dedupes = false; + private boolean throwOnClose; public TestUnboundedSource(Coder<T> coder, T... elems) { + this(coder, false, Arrays.asList(elems)); + } + + private TestUnboundedSource(Coder<T> coder, boolean throwOnClose, List<T> elems) { readerAdvancedCount = 0; readerClosedCount = 0; this.coder = coder; - this.elems = Arrays.asList(elems); + this.elems = elems; + this.throwOnClose = throwOnClose; } @Override @@ -441,9 +480,14 @@ public class UnboundedReadEvaluatorFactoryTest { return coder; } + public TestUnboundedSource<T> throwsOnClose() { + return new TestUnboundedSource<>(coder, true, elems); + } + private class TestUnboundedReader extends UnboundedReader<T> { private final List<T> elems; private int index; + private boolean closed = false; public TestUnboundedReader(List<T> elems, int startIndex) { this.elems = elems; @@ -503,7 +547,16 @@ public class UnboundedReadEvaluatorFactoryTest { @Override public void close() throws IOException { - readerClosedCount++; + try { + readerClosedCount++; + // Enforce the AutoCloseable contract. Close is not idempotent. + assertThat(closed, is(false)); + if (throwOnClose) { + throw new IOException(String.format("%s throws on close", TestUnboundedSource.this)); + } + } finally { + closed = true; + } } } }
