Repository: incubator-beam
Updated Branches:
  refs/heads/master 5a7bd8083 -> 052857023


[BEAM-90] TestCountingSource can throw on checkpointing


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/320a75b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/320a75b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/320a75b1

Branch: refs/heads/master
Commit: 320a75b1da3bbdb9dc5a30c6d0f6811163bddb85
Parents: d4dcaaa
Author: Mark Shields <markshie...@google.com>
Authored: Wed Mar 2 20:49:53 2016 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Thu Mar 3 15:18:44 2016 -0800

----------------------------------------------------------------------
 .../runners/dataflow/TestCountingSource.java    | 31 +++++++++++++++++---
 1 file changed, 27 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/320a75b1/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/dataflow/TestCountingSource.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/dataflow/TestCountingSource.java
 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/dataflow/TestCountingSource.java
index 181ddca..d0863a4 100644
--- 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/dataflow/TestCountingSource.java
+++ 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/dataflow/TestCountingSource.java
@@ -27,6 +27,8 @@ import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import com.google.cloud.dataflow.sdk.values.KV;
 
 import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -46,31 +48,47 @@ import javax.annotation.Nullable;
  */
 public class TestCountingSource
     extends UnboundedSource<KV<Integer, Integer>, 
TestCountingSource.CounterMark> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestCountingSource.class);
+
   private static List<Integer> finalizeTracker;
   private final int numMessagesPerShard;
   private final int shardNumber;
   private final boolean dedup;
+  private final boolean throwOnFirstSnapshot;
+
+  /**
+   * We only allow an exception to be thrown from getCheckpointMark
+   * at most once. This must be static since the entire TestCountingSource
+   * instance may re-serialized when the pipeline recovers and retries.
+   */
+  private static boolean thrown = false;
 
   public static void setFinalizeTracker(List<Integer> finalizeTracker) {
     TestCountingSource.finalizeTracker = finalizeTracker;
   }
 
   public TestCountingSource(int numMessagesPerShard) {
-    this(numMessagesPerShard, 0, false);
+    this(numMessagesPerShard, 0, false, false);
   }
 
   public TestCountingSource withDedup() {
-    return new TestCountingSource(numMessagesPerShard, shardNumber, true);
+    return new TestCountingSource(numMessagesPerShard, shardNumber, true, 
throwOnFirstSnapshot);
   }
 
   private TestCountingSource withShardNumber(int shardNumber) {
-    return new TestCountingSource(numMessagesPerShard, shardNumber, dedup);
+    return new TestCountingSource(numMessagesPerShard, shardNumber, dedup, 
throwOnFirstSnapshot);
   }
 
-  private TestCountingSource(int numMessagesPerShard, int shardNumber, boolean 
dedup) {
+  public TestCountingSource withThrowOnFirstSnapshot(boolean 
throwOnFirstSnapshot) {
+    return new TestCountingSource(numMessagesPerShard, shardNumber, dedup, 
throwOnFirstSnapshot);
+  }
+
+  private TestCountingSource(
+      int numMessagesPerShard, int shardNumber, boolean dedup, boolean 
throwOnFirstSnapshot) {
     this.numMessagesPerShard = numMessagesPerShard;
     this.shardNumber = shardNumber;
     this.dedup = dedup;
+    this.throwOnFirstSnapshot = throwOnFirstSnapshot;
   }
 
   public int getShardNumber() {
@@ -187,6 +205,11 @@ public class TestCountingSource
 
     @Override
     public CheckpointMark getCheckpointMark() {
+      if (throwOnFirstSnapshot && !thrown) {
+        thrown = true;
+        LOG.error("Throwing exception while checkpointing counter");
+        throw new RuntimeException("failed during checkpoint");
+      }
       return new CounterMark(current);
     }
 

Reply via email to