Repository: incubator-beam
Updated Branches:
  refs/heads/master d790dfe1b -> 49f944430


[BEAM-735] PAssertStreaming should make sure the assertion happened.

PAssertStreaming shuold check an assertion happened.

Test assert for skipped assertion..

This name is more true to the natureof this test.

Fix according to new PAssertStreaming.

Make graceful stop optional (streaming) as it may cause tests to repeat becuase 
the context hasn't
died yet.

Use PAssert aggregator names for success and failure.

Stop gracefully to let checkpointing complete.

No need to explicitly call close() in batch pipelines, especially for testing 
where context is
reused anyway.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/44225cff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/44225cff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/44225cff

Branch: refs/heads/master
Commit: 44225cffc0230968738a0d2975f494bbe3238a73
Parents: d790dfe
Author: Sela <ans...@paypal.com>
Authored: Sun Oct 9 21:38:14 2016 +0300
Committer: Sela <ans...@paypal.com>
Committed: Fri Oct 14 16:54:14 2016 +0300

----------------------------------------------------------------------
 .../beam/runners/spark/EvaluationResult.java    |   4 +-
 .../spark/translation/EvaluationContext.java    |   3 +-
 .../streaming/StreamingEvaluationContext.java   |   6 +-
 .../apache/beam/runners/spark/DeDupTest.java    |   3 +-
 .../beam/runners/spark/EmptyInputTest.java      |   1 -
 .../beam/runners/spark/SimpleWordCountTest.java |   6 +-
 .../apache/beam/runners/spark/TfIdfTest.java    |   3 +-
 .../beam/runners/spark/io/AvroPipelineTest.java |   4 +-
 .../beam/runners/spark/io/NumShardsTest.java    |   4 +-
 .../io/hadoop/HadoopFileFormatPipelineTest.java |   4 +-
 .../spark/translation/CombineGloballyTest.java  |   1 -
 .../spark/translation/CombinePerKeyTest.java    |   1 -
 .../spark/translation/DoFnOutputTest.java       |   4 +-
 .../translation/MultiOutputWordCountTest.java   |   2 -
 .../spark/translation/SerializationTest.java    |   4 +-
 .../translation/WindowedWordCountTest.java      |  10 +-
 .../streaming/EmptyStreamAssertionTest.java     |  76 ++++++++
 .../streaming/FlattenStreamingTest.java         |  11 +-
 .../streaming/KafkaStreamingTest.java           |   6 +-
 .../RecoverFromCheckpointStreamingTest.java     | 179 ------------------
 .../ResumeFromCheckpointStreamingTest.java      | 182 +++++++++++++++++++
 .../streaming/SimpleStreamingWordCountTest.java |   6 +-
 .../streaming/utils/PAssertStreaming.java       |  87 ++++++---
 23 files changed, 346 insertions(+), 261 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44225cff/runners/spark/src/main/java/org/apache/beam/runners/spark/EvaluationResult.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/EvaluationResult.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/EvaluationResult.java
index 5fd6bd4..52606a3 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/EvaluationResult.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/EvaluationResult.java
@@ -60,6 +60,8 @@ public interface EvaluationResult extends PipelineResult {
    * this EvaluationResult; once close() has been called,
    * {@link EvaluationResult#get(PCollection)} might
    * not work for subsequent calls.
+   *
+   * @param gracefully true if Spark should finish all ongoing work before 
closing.
    */
-  void close();
+  void close(boolean gracefully);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44225cff/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index 1944b6b..317c7be 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -283,7 +283,8 @@ public class EvaluationContext implements EvaluationResult {
   }
 
   @Override
-  public void close() {
+  public void close(boolean gracefully) {
+    // graceful stop is used for streaming.
     SparkContextFactory.stopSparkContext(jsc);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44225cff/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
index 5a43c55..0b32dfd 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
@@ -178,7 +178,7 @@ public class StreamingEvaluationContext extends 
EvaluationContext {
   }
 
   @Override
-  public void close() {
+  public void close(boolean gracefully) {
     if (timeout > 0) {
       jssc.awaitTerminationOrTimeout(timeout);
     } else {
@@ -186,9 +186,9 @@ public class StreamingEvaluationContext extends 
EvaluationContext {
     }
     // stop streaming context gracefully, so checkpointing (and other 
computations) get to
     // finish before shutdown.
-    jssc.stop(false, true);
+    jssc.stop(false, gracefully);
     state = State.DONE;
-    super.close();
+    super.close(false);
   }
 
   private State state = State.RUNNING;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44225cff/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
index 2d06a20..6c26e76 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java
@@ -53,7 +53,6 @@ public class DeDupTest {
 
     PAssert.that(output).containsInAnyOrder(EXPECTED_SET);
 
-    EvaluationResult res = (EvaluationResult) p.run();
-    res.close();
+    p.run();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44225cff/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
index 90ce427..765dd66 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
@@ -48,7 +48,6 @@ public class EmptyInputTest {
 
     EvaluationResult res = (EvaluationResult) p.run();
     assertEquals("", Iterables.getOnlyElement(res.get(output)));
-    res.close();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44225cff/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
index 238d7ba..2d13fdd 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
@@ -78,8 +78,7 @@ public class SimpleWordCountTest {
 
     PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
 
-    EvaluationResult res = (EvaluationResult) p.run();
-    res.close();
+    p.run();
 
     assertThat(InMemoryMetrics.<Double>valueOf("emptyLines"), is(1d));
   }
@@ -100,8 +99,7 @@ public class SimpleWordCountTest {
     File outputFile = testFolder.newFile();
     output.apply("WriteCounts", 
TextIO.Write.to(outputFile.getAbsolutePath()).withoutSharding());
 
-    EvaluationResult res = (EvaluationResult) p.run();
-    res.close();
+    p.run();
 
     assertThat(Sets.newHashSet(FileUtils.readLines(outputFile)),
         containsInAnyOrder(EXPECTED_COUNT_SET.toArray()));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44225cff/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
index d911bfb..ac9310d 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
@@ -70,8 +70,7 @@ public class TfIdfTest {
 
     PAssert.that(words).containsInAnyOrder(Arrays.asList("a", "m", "n", "b", 
"c", "d"));
 
-    EvaluationResult res = (EvaluationResult) pipeline.run();
-    res.close();
+    pipeline.run();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44225cff/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index 413a71c..fc53dbd 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -33,7 +33,6 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.AvroIO;
@@ -78,8 +77,7 @@ public class AvroPipelineTest {
     PCollection<GenericRecord> input = p.apply(
         AvroIO.Read.from(inputFile.getAbsolutePath()).withSchema(schema));
     
input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema));
-    EvaluationResult res = (EvaluationResult) p.run();
-    res.close();
+    p.run();
 
     List<GenericRecord> records = readGenericFile();
     assertEquals(Lists.newArrayList(savedRecord), records);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44225cff/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
index 81803c3..0ff30ac 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
@@ -30,7 +30,6 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
-import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.runners.spark.examples.WordCount;
@@ -76,8 +75,7 @@ public class NumShardsTest {
     PCollection<String> output = inputWords.apply(new WordCount.CountWords())
         .apply(MapElements.via(new WordCount.FormatAsTextFn()));
     
output.apply(TextIO.Write.to(outputDir.getAbsolutePath()).withNumShards(3).withSuffix(".txt"));
-    EvaluationResult res = (EvaluationResult) p.run();
-    res.close();
+    p.run();
 
     int count = 0;
     Set<String> expected = Sets.newHashSet("hi: 5", "there: 1", "sue: 2", 
"bob: 2");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44225cff/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
index b14465d..aa1e1ce 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertEquals;
 
 import java.io.File;
 import java.io.IOException;
-import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.runners.spark.coders.WritableCoder;
 import org.apache.beam.sdk.Pipeline;
@@ -90,8 +89,7 @@ public class HadoopFileFormatPipelineTest {
     HadoopIO.Write.Bound<IntWritable, Text> write = 
HadoopIO.Write.to(outputFile.getAbsolutePath(),
         outputFormatClass, IntWritable.class, Text.class);
     input.apply(write.withoutSharding());
-    EvaluationResult res = (EvaluationResult) p.run();
-    res.close();
+    p.run();
 
     IntWritable key = new IntWritable();
     Text value = new Text();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44225cff/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
index 8022d06..bd998f2 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
@@ -55,7 +55,6 @@ public class CombineGloballyTest {
     EvaluationResult res = (EvaluationResult) p.run();
     assertEquals("hi there,hi,hi sue bob,hi sue,,bob hi",
         Iterables.getOnlyElement(res.get(output)));
-    res.close();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44225cff/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
index 281144f..b1012a3 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
@@ -59,7 +59,6 @@ public class CombinePerKeyTest {
         for (KV<String, Long> kv : res.get(cnts)) {
             actualCnts.put(kv.getKey(), kv.getValue());
         }
-        res.close();
         Assert.assertEquals(8, actualCnts.size());
         Assert.assertEquals(Long.valueOf(2L), actualCnts.get("the"));
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44225cff/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
index 31e0dd8..b94ca4d 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
@@ -19,7 +19,6 @@
 package org.apache.beam.runners.spark.translation;
 
 import java.io.Serializable;
-import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.sdk.Pipeline;
@@ -61,7 +60,6 @@ public class DoFnOutputTest implements Serializable {
 
     PAssert.that(output).containsInAnyOrder("start", "a", "finish");
 
-    EvaluationResult res = (EvaluationResult) p.run();
-    res.close();
+    p.run();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44225cff/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
index acfa3df..f308f2f 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
@@ -95,8 +95,6 @@ public class MultiOutputWordCountTest {
     AggregatorValues<Integer> aggregatorValues = 
res.getAggregatorValues(countWords
         .getTotalWordsAggregator());
     Assert.assertEquals(18, 
Iterables.getOnlyElement(aggregatorValues.getValues()).intValue());
-
-    res.close();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44225cff/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
index 22a40cd..d8b4a20 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
@@ -28,7 +28,6 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 import java.util.regex.Pattern;
-import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.sdk.Pipeline;
@@ -137,8 +136,7 @@ public class SerializationTest {
 
     PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
 
-    EvaluationResult res = (EvaluationResult) p.run();
-    res.close();
+    p.run();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44225cff/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
index 69b2943..e727d2f 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
@@ -21,7 +21,6 @@ package org.apache.beam.runners.spark.translation;
 import com.google.common.collect.ImmutableList;
 import java.util.Arrays;
 import java.util.List;
-import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.runners.spark.examples.WordCount;
 import org.apache.beam.sdk.Pipeline;
@@ -67,8 +66,7 @@ public class WindowedWordCountTest {
 
     PAssert.that(output).containsInAnyOrder(EXPECTED_FIXED_SEPARATE_COUNT_SET);
 
-    EvaluationResult res = (EvaluationResult) p.run();
-    res.close();
+    p.run();
   }
 
   private static final List<String> EXPECTED_FIXED_SAME_COUNT_SET =
@@ -89,8 +87,7 @@ public class WindowedWordCountTest {
 
     PAssert.that(output).containsInAnyOrder(EXPECTED_FIXED_SAME_COUNT_SET);
 
-    EvaluationResult res = (EvaluationResult) p.run();
-    res.close();
+    p.run();
   }
 
   private static final List<String> EXPECTED_SLIDING_COUNT_SET =
@@ -113,8 +110,7 @@ public class WindowedWordCountTest {
 
     PAssert.that(output).containsInAnyOrder(EXPECTED_SLIDING_COUNT_SET);
 
-    EvaluationResult res = (EvaluationResult) p.run();
-    res.close();
+    p.run();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44225cff/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
new file mode 100644
index 0000000..1560c66
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation.streaming;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.Serializable;
+import java.util.Collections;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.io.CreateStream;
+import 
org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
+import 
org.apache.beam.runners.spark.translation.streaming.utils.TestOptionsForStreaming;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+
+
+
+/**
+ * Test that {@link PAssertStreaming} can tell if the stream is empty.
+ */
+public class EmptyStreamAssertionTest implements Serializable {
+
+  private static final String EXPECTED_ERR =
+      "Success aggregator should be greater than zero.\n"
+          + "Expected: not <0>\n"
+          + "     but: was <0>";
+
+  @Rule
+  public TestOptionsForStreaming commonOptions = new TestOptionsForStreaming();
+
+  @Test
+  public void testFixedWindows() throws Exception {
+    SparkPipelineOptions options = commonOptions.getOptions();
+    Duration windowDuration = new Duration(options.getBatchIntervalMillis());
+
+    Pipeline pipeline = Pipeline.create(options);
+
+    PCollection<String> output =
+        pipeline
+            
.apply(CreateStream.fromQueue(Collections.<Iterable<String>>emptyList()))
+            .setCoder(StringUtf8Coder.of())
+            .apply(Window.<String>into(FixedWindows.of(windowDuration)));
+
+    try {
+      PAssertStreaming.runAndAssertContents(pipeline, output, new String[0]);
+    } catch (AssertionError e) {
+      assertTrue("Expected error message: " + EXPECTED_ERR + " but got: " + 
e.getMessage(),
+          e.getMessage().equals(EXPECTED_ERR));
+      return;
+    }
+    fail("assertion should have failed");
+    throw new RuntimeException("unreachable");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44225cff/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
index 8210b0d..e6872f1 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.spark.translation.streaming;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.io.CreateStream;
 import 
org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
@@ -77,10 +76,7 @@ public class FlattenStreamingTest {
     PCollectionList<String> list = 
PCollectionList.of(windowedW1).and(windowedW2);
     PCollection<String> union = list.apply(Flatten.<String>pCollections());
 
-    PAssertStreaming.assertContents(union, EXPECTED_UNION);
-
-    EvaluationResult res = (EvaluationResult) p.run();
-    res.close();
+    PAssertStreaming.runAndAssertContents(p, union, EXPECTED_UNION);
   }
 
   @Test
@@ -100,10 +96,7 @@ public class FlattenStreamingTest {
     PCollectionList<String> list = 
PCollectionList.of(windowedW1).and(windowedW2);
     PCollection<String> union = list.apply(Flatten.<String>pCollections());
 
-    PAssertStreaming.assertContents(union, EXPECTED_UNION);
-
-    EvaluationResult res = (EvaluationResult) p.run();
-    res.close();
+    PAssertStreaming.runAndAssertContents(p, union, EXPECTED_UNION);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44225cff/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index caf5d13..d72e70e 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -23,7 +23,6 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
 import kafka.serializer.StringDecoder;
-import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.io.KafkaIO;
 import 
org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster;
@@ -107,10 +106,7 @@ public class KafkaStreamingTest {
 
     PCollection<String> formattedKV = windowedWords.apply(ParDo.of(new 
FormatKVFn()));
 
-    PAssertStreaming.assertContents(formattedKV, EXPECTED);
-
-    EvaluationResult res = (EvaluationResult) p.run();
-    res.close();
+    PAssertStreaming.runAndAssertContents(p, formattedKV, EXPECTED);
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44225cff/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/RecoverFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/RecoverFromCheckpointStreamingTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/RecoverFromCheckpointStreamingTest.java
deleted file mode 100644
index 05e9125..0000000
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/RecoverFromCheckpointStreamingTest.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.translation.streaming;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.Uninterruptibles;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import kafka.serializer.StringDecoder;
-import org.apache.beam.runners.spark.EvaluationResult;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
-import org.apache.beam.runners.spark.io.KafkaIO;
-import 
org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster;
-import 
org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
-import 
org.apache.beam.runners.spark.translation.streaming.utils.TestOptionsForStreaming;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.joda.time.Duration;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-
-/**
- * Tests DStream recovery from checkpoint - recreate the job and continue 
(from checkpoint).
- *
- * <p>Tests Aggregators, which rely on Accumulators - Aggregators should be 
available, though
- * state is not preserved (Spark issue), so they start from initial value.
- * //TODO: after the runner supports recovering the state of Aggregators, 
update this test's
- * expected values for the recovered (second) run.
- */
-public class RecoverFromCheckpointStreamingTest {
-  private static final EmbeddedKafkaCluster.EmbeddedZookeeper 
EMBEDDED_ZOOKEEPER =
-      new EmbeddedKafkaCluster.EmbeddedZookeeper();
-  private static final EmbeddedKafkaCluster EMBEDDED_KAFKA_CLUSTER =
-      new EmbeddedKafkaCluster(EMBEDDED_ZOOKEEPER.getConnection(), new 
Properties());
-  private static final String TOPIC = "kafka_beam_test_topic";
-  private static final Map<String, String> KAFKA_MESSAGES = ImmutableMap.of(
-      "k1", "v1", "k2", "v2", "k3", "v3", "k4", "v4"
-  );
-  private static final String[] EXPECTED = {"k1,v1", "k2,v2", "k3,v3", 
"k4,v4"};
-  private static final long EXPECTED_AGG_FIRST = 4L;
-
-  @Rule
-  public TemporaryFolder checkpointParentDir = new TemporaryFolder();
-
-  @Rule
-  public TestOptionsForStreaming commonOptions = new TestOptionsForStreaming();
-
-  @BeforeClass
-  public static void init() throws IOException {
-    EMBEDDED_ZOOKEEPER.startup();
-    EMBEDDED_KAFKA_CLUSTER.startup();
-    /// this test actually requires to NOT reuse the context but rather to 
stop it and start again
-    // from the checkpoint with a brand new context.
-    System.setProperty("beam.spark.test.reuseSparkContext", "false");
-    // write to Kafka
-    Properties producerProps = new Properties();
-    producerProps.putAll(EMBEDDED_KAFKA_CLUSTER.getProps());
-    producerProps.put("request.required.acks", 1);
-    producerProps.put("bootstrap.servers", 
EMBEDDED_KAFKA_CLUSTER.getBrokerList());
-    Serializer<String> stringSerializer = new StringSerializer();
-    try (@SuppressWarnings("unchecked") KafkaProducer<String, String> 
kafkaProducer =
-        new KafkaProducer(producerProps, stringSerializer, stringSerializer)) {
-      for (Map.Entry<String, String> en : KAFKA_MESSAGES.entrySet()) {
-        kafkaProducer.send(new ProducerRecord<>(TOPIC, en.getKey(), 
en.getValue()));
-      }
-      kafkaProducer.close();
-    }
-  }
-
-  @Test
-  public void testRun() throws Exception {
-    SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
-        checkpointParentDir.newFolder(getClass().getSimpleName()));
-
-    // checkpoint after first (and only) interval.
-    options.setCheckpointDurationMillis(options.getBatchIntervalMillis());
-
-    // first run will read from Kafka backlog - "auto.offset.reset=smallest"
-    EvaluationResult res = run(options);
-    res.close();
-    long processedMessages1 = res.getAggregatorValue("processedMessages", 
Long.class);
-    assertThat(String.format("Expected %d processed messages count but "
-        + "found %d", EXPECTED_AGG_FIRST, processedMessages1), 
processedMessages1,
-            equalTo(EXPECTED_AGG_FIRST));
-
-    // recovery should resume from last read offset, so nothing is read here.
-    res = runAgain(options);
-    res.close();
-    long processedMessages2 = res.getAggregatorValue("processedMessages", 
Long.class);
-    assertThat(String.format("Expected %d processed messages count but "
-        + "found %d", 0, processedMessages2), processedMessages2, equalTo(0L));
-  }
-
-  private static EvaluationResult runAgain(SparkPipelineOptions options) {
-    AccumulatorSingleton.clear();
-    // sleep before next run.
-    Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
-    return run(options);
-  }
-
-  private static EvaluationResult run(SparkPipelineOptions options) {
-    Map<String, String> kafkaParams = ImmutableMap.of(
-            "metadata.broker.list", EMBEDDED_KAFKA_CLUSTER.getBrokerList(),
-            "auto.offset.reset", "smallest"
-    );
-    Pipeline p = Pipeline.create(options);
-    PCollection<KV<String, String>> kafkaInput = p.apply(KafkaIO.Read.from(
-        StringDecoder.class, StringDecoder.class, String.class, String.class,
-            Collections.singleton(TOPIC), 
kafkaParams)).setCoder(KvCoder.of(StringUtf8Coder.of(),
-                StringUtf8Coder.of()));
-    PCollection<KV<String, String>> windowedWords = kafkaInput
-        .apply(Window.<KV<String, 
String>>into(FixedWindows.of(Duration.standardSeconds(1))));
-    PCollection<String> formattedKV = windowedWords.apply(ParDo.of(
-        new FormatAsText()));
-
-    PAssertStreaming.assertContents(formattedKV, EXPECTED);
-
-    return  (EvaluationResult) p.run();
-  }
-
-  @AfterClass
-  public static void tearDown() {
-    EMBEDDED_KAFKA_CLUSTER.shutdown();
-    EMBEDDED_ZOOKEEPER.shutdown();
-  }
-
-  private static class FormatAsText extends DoFn<KV<String, String>, String> {
-
-    private final Aggregator<Long, Long> aggregator =
-        createAggregator("processedMessages", new Sum.SumLongFn());
-
-    @ProcessElement
-    public void process(ProcessContext c) {
-      aggregator.addValue(1L);
-      String formatted = c.element().getKey() + "," + c.element().getValue();
-      c.output(formatted);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44225cff/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
new file mode 100644
index 0000000..fc7fa34
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation.streaming;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import kafka.serializer.StringDecoder;
+import org.apache.beam.runners.spark.EvaluationResult;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
+import org.apache.beam.runners.spark.io.KafkaIO;
+import 
org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster;
+import 
org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
+import 
org.apache.beam.runners.spark.translation.streaming.utils.TestOptionsForStreaming;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.joda.time.Duration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+
+/**
+ * Tests DStream recovery from checkpoint - recreate the job and continue 
(from checkpoint).
+ *
+ * <p>Tests Aggregators, which rely on Accumulators - Aggregators should be 
available, though
+ * state is not preserved (Spark issue), so they start from initial value.
+ * //TODO: after the runner supports recovering the state of Aggregators, 
update this test's
+ * expected values for the recovered (second) run.
+ */
+public class ResumeFromCheckpointStreamingTest {
+  private static final EmbeddedKafkaCluster.EmbeddedZookeeper 
EMBEDDED_ZOOKEEPER =
+      new EmbeddedKafkaCluster.EmbeddedZookeeper();
+  private static final EmbeddedKafkaCluster EMBEDDED_KAFKA_CLUSTER =
+      new EmbeddedKafkaCluster(EMBEDDED_ZOOKEEPER.getConnection(), new 
Properties());
+  private static final String TOPIC = "kafka_beam_test_topic";
+  private static final Map<String, String> KAFKA_MESSAGES = ImmutableMap.of(
+      "k1", "v1", "k2", "v2", "k3", "v3", "k4", "v4"
+  );
+  private static final String[] EXPECTED = {"k1,v1", "k2,v2", "k3,v3", 
"k4,v4"};
+  private static final long EXPECTED_AGG_FIRST = 4L;
+
+  @Rule
+  public TemporaryFolder checkpointParentDir = new TemporaryFolder();
+
+  @Rule
+  public TestOptionsForStreaming commonOptions = new TestOptionsForStreaming();
+
+  @BeforeClass
+  public static void init() throws IOException {
+    EMBEDDED_ZOOKEEPER.startup();
+    EMBEDDED_KAFKA_CLUSTER.startup();
+    /// this test actually requires to NOT reuse the context but rather to 
stop it and start again
+    // from the checkpoint with a brand new context.
+    System.setProperty("beam.spark.test.reuseSparkContext", "false");
+  }
+
+  private static void produce() {
+    Properties producerProps = new Properties();
+    producerProps.putAll(EMBEDDED_KAFKA_CLUSTER.getProps());
+    producerProps.put("request.required.acks", 1);
+    producerProps.put("bootstrap.servers", 
EMBEDDED_KAFKA_CLUSTER.getBrokerList());
+    Serializer<String> stringSerializer = new StringSerializer();
+    try (@SuppressWarnings("unchecked") KafkaProducer<String, String> 
kafkaProducer =
+        new KafkaProducer(producerProps, stringSerializer, stringSerializer)) {
+          for (Map.Entry<String, String> en : KAFKA_MESSAGES.entrySet()) {
+            kafkaProducer.send(new ProducerRecord<>(TOPIC, en.getKey(), 
en.getValue()));
+          }
+          kafkaProducer.close();
+        }
+  }
+
+  @Test
+  public void testRun() throws Exception {
+    SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
+        checkpointParentDir.newFolder(getClass().getSimpleName()));
+
+    // checkpoint after first (and only) interval.
+    options.setCheckpointDurationMillis(options.getBatchIntervalMillis());
+
+    // first run will read from Kafka backlog - "auto.offset.reset=smallest"
+    EvaluationResult res = run(options);
+    long processedMessages1 = res.getAggregatorValue("processedMessages", 
Long.class);
+    assertThat(String.format("Expected %d processed messages count but "
+        + "found %d", EXPECTED_AGG_FIRST, processedMessages1), 
processedMessages1,
+            equalTo(EXPECTED_AGG_FIRST));
+
+    // recovery should resume from last read offset, and read the second batch 
of input.
+    res = runAgain(options);
+    long processedMessages2 = res.getAggregatorValue("processedMessages", 
Long.class);
+    assertThat(String.format("Expected %d processed messages count but "
+        + "found %d", EXPECTED_AGG_FIRST, processedMessages2), 
processedMessages2,
+            equalTo(EXPECTED_AGG_FIRST));
+  }
+
+  private static EvaluationResult runAgain(SparkPipelineOptions options) {
+    AccumulatorSingleton.clear();
+    // sleep before next run.
+    Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+    return run(options);
+  }
+
+  private static EvaluationResult run(SparkPipelineOptions options) {
+    // write to Kafka
+    produce();
+    Map<String, String> kafkaParams = ImmutableMap.of(
+            "metadata.broker.list", EMBEDDED_KAFKA_CLUSTER.getBrokerList(),
+            "auto.offset.reset", "smallest"
+    );
+    Pipeline p = Pipeline.create(options);
+    PCollection<KV<String, String>> kafkaInput = p.apply(KafkaIO.Read.from(
+        StringDecoder.class, StringDecoder.class, String.class, String.class,
+            Collections.singleton(TOPIC), 
kafkaParams)).setCoder(KvCoder.of(StringUtf8Coder.of(),
+                StringUtf8Coder.of()));
+    PCollection<KV<String, String>> windowedWords = kafkaInput
+        .apply(Window.<KV<String, 
String>>into(FixedWindows.of(Duration.standardSeconds(1))));
+    PCollection<String> formattedKV = windowedWords.apply(ParDo.of(
+        new FormatAsText()));
+
+    // requires a graceful stop so that checkpointing of the first run would 
finish successfully
+    // before stopping and attempting to resume.
+    return PAssertStreaming.runAndAssertContents(p, formattedKV, EXPECTED, 
true);
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    EMBEDDED_KAFKA_CLUSTER.shutdown();
+    EMBEDDED_ZOOKEEPER.shutdown();
+  }
+
+  private static class FormatAsText extends DoFn<KV<String, String>, String> {
+
+    private final Aggregator<Long, Long> aggregator =
+        createAggregator("processedMessages", new Sum.SumLongFn());
+
+    @ProcessElement
+    public void process(ProcessContext c) {
+      aggregator.addValue(1L);
+      String formatted = c.element().getKey() + "," + c.element().getValue();
+      c.output(formatted);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44225cff/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
index d505878..8f2dde3 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
@@ -21,7 +21,6 @@ import com.google.common.collect.Lists;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
-import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.examples.WordCount;
 import org.apache.beam.runners.spark.io.CreateStream;
@@ -82,9 +81,6 @@ public class SimpleStreamingWordCountTest implements 
Serializable {
             .apply(new WordCount.CountWords())
             .apply(MapElements.via(new WordCount.FormatAsTextFn()));
 
-    PAssertStreaming.assertContents(output, EXPECTED_WORD_COUNTS);
-
-    EvaluationResult res = (EvaluationResult) pipeline.run();
-    res.close();
+    PAssertStreaming.runAndAssertContents(pipeline, output, 
EXPECTED_WORD_COUNTS);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44225cff/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
index 97425bd..3bf1ef6 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
@@ -18,58 +18,99 @@
 package org.apache.beam.runners.spark.translation.streaming.utils;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 
 import java.io.Serializable;
 import org.apache.beam.runners.spark.EvaluationResult;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 
 /**
  * Since PAssert doesn't propagate assert exceptions, use Aggregators to 
assert streaming
  * success/failure counters.
  */
 public final class PAssertStreaming implements Serializable {
-
-  /**
-   * Copied aggregator names from {@link org.apache.beam.sdk.testing.PAssert}.
-   */
-  static final String SUCCESS_COUNTER = "PAssertSuccess";
-  static final String FAILURE_COUNTER = "PAssertFailure";
+  private static final Logger LOG = 
LoggerFactory.getLogger(PAssertStreaming.class);
 
   private PAssertStreaming() {
   }
 
-  public static void assertNoFailures(EvaluationResult res) {
-    int failures = res.getAggregatorValue(FAILURE_COUNTER, Integer.class);
-    Assert.assertEquals("Found " + failures + " failures, see the log for 
details", 0, failures);
-  }
-
   /**
    * Adds a pipeline run-time assertion that the contents of {@code actual} 
are {@code expected}.
    * Note that it is oblivious to windowing, so the assertion will apply 
indiscriminately to all
    * windows.
    */
-  public static <T> void assertContents(PCollection<T> actual, final T[] 
expected) {
+  public static <T> EvaluationResult runAndAssertContents(Pipeline p,
+                                                          PCollection<T> 
actual,
+                                                          T[] expected,
+                                                          boolean 
stopGracefully) {
     // Because PAssert does not support non-global windowing, but all our data 
is in one window,
     // we set up the assertion directly.
     actual
         .apply(WithKeys.<String, T>of("dummy"))
         .apply(GroupByKey.<String, T>create())
         .apply(Values.<Iterable<T>>create())
-        .apply(
-            MapElements.via(
-                new SimpleFunction<Iterable<T>, Void>() {
-                  @Override
-                  public Void apply(Iterable<T> input) {
-                    assertThat(input, containsInAnyOrder(expected));
-                    return null;
-                  }
-                }));
+        .apply(ParDo.of(new AssertDoFn<>(expected)));
+
+    // run the pipeline.
+    EvaluationResult res = (EvaluationResult) p.run();
+    res.close(stopGracefully);
+    // validate assertion succeeded (at least once).
+    int success = res.getAggregatorValue(PAssert.SUCCESS_COUNTER, 
Integer.class);
+    Assert.assertThat("Success aggregator should be greater than zero.", 
success, not(0));
+    // validate assertion didn't fail.
+    int failure = res.getAggregatorValue(PAssert.FAILURE_COUNTER, 
Integer.class);
+    Assert.assertThat("Failure aggregator should be zero.", failure, is(0));
+
+    LOG.info("PAssertStreaming had {} successful assertion and {} failed.", 
success, failure);
+    return res;
+  }
+
+  /**
+   * Default to stop immediately, useful for most tests except for the once 
that may require
+   * to finish writing checkpoints for example.
+   */
+  public static <T> EvaluationResult runAndAssertContents(Pipeline p,
+                                                          PCollection<T> 
actual,
+                                                          T[] expected) {
+    return runAndAssertContents(p, actual, expected, false);
+  }
+
+  private static class AssertDoFn<T> extends OldDoFn<Iterable<T>, Void> {
+    private final Aggregator<Integer, Integer> success =
+        createAggregator(PAssert.SUCCESS_COUNTER, new Sum.SumIntegerFn());
+    private final Aggregator<Integer, Integer> failure =
+        createAggregator(PAssert.FAILURE_COUNTER, new Sum.SumIntegerFn());
+    private final T[] expected;
+
+    AssertDoFn(T[] expected) {
+      this.expected = expected;
+    }
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      try {
+        assertThat(c.element(), containsInAnyOrder(expected));
+        success.addValue(1);
+      } catch (Throwable t) {
+        failure.addValue(1);
+        LOG.error("PAssert failed expectations.", t);
+        // don't throw t because it will fail this bundle and the failure 
count will be lost.
+      }
+    }
   }
 }

Reply via email to