Stop Double-finalizing checkpoints in the DirectRunner Checkpoints don't need to be finalized before we restore from them.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d56d7451 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d56d7451 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d56d7451 Branch: refs/heads/master Commit: d56d7451a5d7c304d3cb1b5551d918773aec1c65 Parents: e362e6b Author: Thomas Groh <[email protected]> Authored: Mon Mar 13 15:22:24 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Tue Mar 14 10:45:48 2017 -0700 ---------------------------------------------------------------------- .../runners/direct/UnboundedReadEvaluatorFactory.java | 3 --- .../direct/UnboundedReadEvaluatorFactoryTest.java | 13 ++++++++----- 2 files changed, 8 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d56d7451/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java index ff59390..69e6920 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java @@ -170,9 +170,6 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { UnboundedReader<OutputT> existing = shard.getExistingReader(); if (existing == null) { CheckpointMarkT checkpoint = shard.getCheckpoint(); - if (checkpoint != null) { - checkpoint.finalizeCheckpoint(); - } return shard .getSource() .createReader(evaluationContext.getPipelineOptions(), checkpoint); http://git-wip-us.apache.org/repos/asf/beam/blob/d56d7451/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java index 987f927..7e2d85d 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.direct; +import static com.google.common.base.Preconditions.checkState; import static org.apache.beam.runners.direct.DirectGraphs.getProducer; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; @@ -373,6 +374,9 @@ public class UnboundedReadEvaluatorFactoryTest { secondEvaluator.finishBundle(); assertThat(TestUnboundedSource.readerClosedCount, equalTo(2)); + assertThat( + Iterables.getOnlyElement(residual.getElements()).getValue().getCheckpoint().isFinalized(), + is(true)); } /** @@ -415,9 +419,6 @@ public class UnboundedReadEvaluatorFactoryTest { @Override public UnboundedSource.UnboundedReader<T> createReader( PipelineOptions options, @Nullable TestCheckpointMark checkpointMark) { - if (checkpointMark != null) { - assertThat(checkpointMark.isFinalized(), is(true)); - } return new TestUnboundedReader(elems, checkpointMark == null ? -1 : checkpointMark.index); } @@ -517,6 +518,8 @@ public class UnboundedReadEvaluatorFactoryTest { @Override public void finalizeCheckpoint() throws IOException { + checkState( + !finalized, "%s was finalized more than once", TestCheckpointMark.class.getSimpleName()); finalized = true; } @@ -530,14 +533,14 @@ public class UnboundedReadEvaluatorFactoryTest { TestCheckpointMark value, OutputStream outStream, org.apache.beam.sdk.coders.Coder.Context context) - throws CoderException, IOException { + throws IOException { VarInt.encode(value.index, outStream); } @Override public TestCheckpointMark decode( InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) - throws CoderException, IOException { + throws IOException { return new TestCheckpointMark(VarInt.decodeInt(inStream)); } }
