Repository: incubator-beam
Updated Branches:
  refs/heads/master 843275210 -> 6082ebcce


[BEAM-613] Revised SimpleStreamingWordCountTest to better test fixed windows.

Revised the test to test multiple batches

Set the timeout to 1 ms since it essentially plays no role here.
Removed blank lines between imports.

Refactored the timeout related stuff to make it more natural from Beam model's 
perspective.

Fix windowing bug.

Expected result if for the entire window.

Renamed the test's name to better reflect the use case it's testing.

Fixed a typo.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b1474a18
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b1474a18
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b1474a18

Branch: refs/heads/master
Commit: b1474a18c4fe3b3aefdb6cd364fce9dfc227b6df
Parents: 8432752
Author: Stas Levin <stasle...@gmail.com>
Authored: Mon Sep 5 18:22:59 2016 +0300
Committer: Sela <ans...@paypal.com>
Committed: Thu Sep 22 18:18:19 2016 +0300

----------------------------------------------------------------------
 .../streaming/StreamingTransformTranslator.java | 28 +++++------
 .../streaming/SimpleStreamingWordCountTest.java | 49 +++++++++++++-------
 2 files changed, 46 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1474a18/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 64ddc57..9cb377d 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -191,27 +191,29 @@ public final class StreamingTransformTranslator {
         @SuppressWarnings("unchecked")
         JavaDStream<WindowedValue<T>> dStream =
             (JavaDStream<WindowedValue<T>>) sec.getStream(transform);
+        // get the right window durations.
+        Duration windowDuration;
+        Duration slideDuration;
         if (windowFn instanceof FixedWindows) {
-          Duration windowDuration = Durations.milliseconds(((FixedWindows) 
windowFn).getSize()
-              .getMillis());
-          sec.setStream(transform, dStream.window(windowDuration));
+          windowDuration = Durations.milliseconds(((FixedWindows) 
windowFn).getSize().getMillis());
+          slideDuration = windowDuration;
         } else if (windowFn instanceof SlidingWindows) {
-          Duration windowDuration = Durations.milliseconds(((SlidingWindows) 
windowFn).getSize()
-              .getMillis());
-          Duration slideDuration = Durations.milliseconds(((SlidingWindows) 
windowFn).getPeriod()
-              .getMillis());
-          sec.setStream(transform, dStream.window(windowDuration, 
slideDuration));
+          SlidingWindows slidingWindows = (SlidingWindows) windowFn;
+          windowDuration = 
Durations.milliseconds(slidingWindows.getSize().getMillis());
+          slideDuration = 
Durations.milliseconds(slidingWindows.getPeriod().getMillis());
+        } else {
+          throw new UnsupportedOperationException(String.format("WindowFn %s 
is not supported.",
+              windowFn.getClass().getCanonicalName()));
         }
+        JavaDStream<WindowedValue<T>> windowedDStream =
+            dStream.window(windowDuration, slideDuration);
         //--- then we apply windowing to the elements
-        @SuppressWarnings("unchecked")
-        JavaDStream<WindowedValue<T>> dStream2 =
-            (JavaDStream<WindowedValue<T>>) sec.getStream(transform);
         if (TranslationUtils.skipAssignWindows(transform, context)) {
-          sec.setStream(transform, dStream2);
+          sec.setStream(transform, windowedDStream);
         } else {
           final OldDoFn<T, T> addWindowsDoFn = new 
AssignWindowsDoFn<>(windowFn);
           final SparkRuntimeContext runtimeContext = sec.getRuntimeContext();
-          JavaDStream<WindowedValue<T>> outStream = dStream2.transform(
+          JavaDStream<WindowedValue<T>> outStream = windowedDStream.transform(
               new Function<JavaRDD<WindowedValue<T>>, 
JavaRDD<WindowedValue<T>>>() {
             @Override
             public JavaRDD<WindowedValue<T>> call(JavaRDD<WindowedValue<T>> 
rdd) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1474a18/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
index 1464273..d505878 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
@@ -17,10 +17,9 @@
  */
 package org.apache.beam.runners.spark.translation.streaming;
 
-
+import com.google.common.collect.Lists;
 import java.io.Serializable;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
@@ -45,33 +44,47 @@ import org.junit.rules.TemporaryFolder;
  */
 public class SimpleStreamingWordCountTest implements Serializable {
 
-  private static final String[] WORDS_ARRAY = {
-      "hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"};
-  private static final List<Iterable<String>> WORDS_QUEUE =
-      Collections.<Iterable<String>>singletonList(Arrays.asList(WORDS_ARRAY));
-  private static final String[] EXPECTED_COUNTS = {"hi: 5", "there: 1", "sue: 
2", "bob: 2"};
-
   @Rule
   public TemporaryFolder checkpointParentDir = new TemporaryFolder();
 
   @Rule
   public TestOptionsForStreaming commonOptions = new TestOptionsForStreaming();
 
+  private static final String[] WORDS = {"hi there", "hi", "hi sue bob", "hi 
sue", "", "bob hi"};
+
+  private static final List<Iterable<String>> MANY_WORDS =
+      Lists.<Iterable<String>>newArrayList(Arrays.asList(WORDS), 
Arrays.asList(WORDS));
+
+  private static final String[] EXPECTED_WORD_COUNTS = {"hi: 10", "there: 2", 
"sue: 4", "bob: 4"};
+
+  private static final Duration BATCH_INTERVAL = Duration.standardSeconds(1);
+
+  private static final Duration windowDuration = 
BATCH_INTERVAL.multipliedBy(2);
+
   @Test
-  public void testRun() throws Exception {
+  public void testFixedWindows() throws Exception {
+
     SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
         checkpointParentDir.newFolder(getClass().getSimpleName()));
 
-    Pipeline p = Pipeline.create(options);
-    PCollection<String> inputWords =
-        
p.apply(CreateStream.fromQueue(WORDS_QUEUE)).setCoder(StringUtf8Coder.of());
-    PCollection<String> windowedWords = inputWords
-        
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))));
-    PCollection<String> output = windowedWords.apply(new 
WordCount.CountWords())
-        .apply(MapElements.via(new WordCount.FormatAsTextFn()));
+    // override defaults
+    options.setBatchIntervalMillis(BATCH_INTERVAL.getMillis());
+    // graceful stop is on, so no worries about the timeout and window being 
equal
+    options.setTimeout(windowDuration.getMillis());
+
+    Pipeline pipeline = Pipeline.create(options);
+
+    PCollection<String> output =
+        pipeline
+            .apply(CreateStream.fromQueue(MANY_WORDS))
+            .setCoder(StringUtf8Coder.of())
+            .apply(Window.<String>into(FixedWindows.of(windowDuration)))
+            .apply(new WordCount.CountWords())
+            .apply(MapElements.via(new WordCount.FormatAsTextFn()));
+
+    PAssertStreaming.assertContents(output, EXPECTED_WORD_COUNTS);
 
-    PAssertStreaming.assertContents(output, EXPECTED_COUNTS);
-    EvaluationResult res = (EvaluationResult) p.run();
+    EvaluationResult res = (EvaluationResult) pipeline.run();
     res.close();
   }
 }

Reply via email to