Respect checkpointing contract
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0236bc18 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0236bc18 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0236bc18 Branch: refs/heads/master Commit: 0236bc187fb5b633eee14728fc41b6a0b81a3b65 Parents: e953cb0 Author: Mark Shields <[email protected]> Authored: Fri Apr 22 15:53:01 2016 -0700 Committer: Mark Shields <[email protected]> Committed: Mon Apr 25 14:10:50 2016 -0700 ---------------------------------------------------------------------- .../runners/dataflow/TestCountingSource.java | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0236bc18/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java index a985a31..1ea3521 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java @@ -164,21 +164,20 @@ public class TestCountingSource @Override public boolean start() { - return true; + return advance(); } @Override public boolean advance() { - if (current < numMessagesPerShard - 1) { + if (current >= numMessagesPerShard) { + return false; + } // If testing dedup, occasionally insert a duplicate value; if (dedup && ThreadLocalRandom.current().nextInt(5) == 0) { return true; } current++; - return true; - } else { - return false; - } + return current < numMessagesPerShard; } @Override @@ -222,6 +221,8 @@ public class TestCountingSource LOG.error("Throwing exception while checkpointing counter"); throw new RuntimeException("failed during checkpoint"); } + // The checkpoint can assume all records read, including the current, have + // been commited. return new CounterMark(current); } @@ -234,7 +235,12 @@ public class TestCountingSource @Override public CountingSourceReader createReader( PipelineOptions options, @Nullable CounterMark checkpointMark) { - return new CountingSourceReader(checkpointMark != null ? checkpointMark.current : 0); + if (checkpointMark == null) { + LOG.debug("creating reader"); + } else { + LOG.debug("restoring reader from checkpoint with current = {}", checkpointMark.current); + } + return new CountingSourceReader(checkpointMark != null ? checkpointMark.current : -1); } @Override
