Repository: incubator-beam Updated Branches: refs/heads/master 5a7bd8083 -> 052857023
[BEAM-90] TestCountingSource can throw on checkpointing Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/320a75b1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/320a75b1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/320a75b1 Branch: refs/heads/master Commit: 320a75b1da3bbdb9dc5a30c6d0f6811163bddb85 Parents: d4dcaaa Author: Mark Shields <markshie...@google.com> Authored: Wed Mar 2 20:49:53 2016 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Thu Mar 3 15:18:44 2016 -0800 ---------------------------------------------------------------------- .../runners/dataflow/TestCountingSource.java | 31 +++++++++++++++++--- 1 file changed, 27 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/320a75b1/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/dataflow/TestCountingSource.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/dataflow/TestCountingSource.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/dataflow/TestCountingSource.java index 181ddca..d0863a4 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/dataflow/TestCountingSource.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/dataflow/TestCountingSource.java @@ -27,6 +27,8 @@ import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.values.KV; import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; @@ -46,31 +48,47 @@ import javax.annotation.Nullable; */ public class TestCountingSource extends UnboundedSource<KV<Integer, Integer>, TestCountingSource.CounterMark> { + private static final Logger LOG = LoggerFactory.getLogger(TestCountingSource.class); + private static List<Integer> finalizeTracker; private final int numMessagesPerShard; private final int shardNumber; private final boolean dedup; + private final boolean throwOnFirstSnapshot; + + /** + * We only allow an exception to be thrown from getCheckpointMark + * at most once. This must be static since the entire TestCountingSource + * instance may re-serialized when the pipeline recovers and retries. + */ + private static boolean thrown = false; public static void setFinalizeTracker(List<Integer> finalizeTracker) { TestCountingSource.finalizeTracker = finalizeTracker; } public TestCountingSource(int numMessagesPerShard) { - this(numMessagesPerShard, 0, false); + this(numMessagesPerShard, 0, false, false); } public TestCountingSource withDedup() { - return new TestCountingSource(numMessagesPerShard, shardNumber, true); + return new TestCountingSource(numMessagesPerShard, shardNumber, true, throwOnFirstSnapshot); } private TestCountingSource withShardNumber(int shardNumber) { - return new TestCountingSource(numMessagesPerShard, shardNumber, dedup); + return new TestCountingSource(numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot); } - private TestCountingSource(int numMessagesPerShard, int shardNumber, boolean dedup) { + public TestCountingSource withThrowOnFirstSnapshot(boolean throwOnFirstSnapshot) { + return new TestCountingSource(numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot); + } + + private TestCountingSource( + int numMessagesPerShard, int shardNumber, boolean dedup, boolean throwOnFirstSnapshot) { this.numMessagesPerShard = numMessagesPerShard; this.shardNumber = shardNumber; this.dedup = dedup; + this.throwOnFirstSnapshot = throwOnFirstSnapshot; } public int getShardNumber() { @@ -187,6 +205,11 @@ public class TestCountingSource @Override public CheckpointMark getCheckpointMark() { + if (throwOnFirstSnapshot && !thrown) { + thrown = true; + LOG.error("Throwing exception while checkpointing counter"); + throw new RuntimeException("failed during checkpoint"); + } return new CounterMark(current); }