Fix streaming translation of Flatten and Window, make CreateStream eager.

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9f143504
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9f143504
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9f143504

Branch: refs/heads/master
Commit: 9f143504f7ade5bb0a966dcef645a834277632ef
Parents: 96abe4f
Author: Sela <[email protected]>
Authored: Sat Feb 18 22:01:58 2017 +0200
Committer: Sela <[email protected]>
Committed: Wed Mar 1 00:17:59 2017 +0200

----------------------------------------------------------------------
 .../spark/translation/EvaluationContext.java    |   6 --
 .../streaming/StreamingTransformTranslator.java | 101 +++++++++----------
 .../translation/streaming/UnboundedDataset.java |  50 +--------
 3 files changed, 52 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9f143504/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index a35aff2..329e047 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -27,7 +27,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -136,11 +135,6 @@ public class EvaluationContext {
     datasets.put(getOutput(transform), new BoundedDataset<>(values, jsc, 
coder));
   }
 
-  public <T> void putUnboundedDatasetFromQueue(
-      PTransform<?, ? extends PValue> transform, Iterable<Iterable<T>> values, 
Coder<T> coder) {
-    datasets.put(getOutput(transform), new UnboundedDataset<>(values, jssc, 
coder));
-  }
-
   public Dataset borrowDataset(PTransform<? extends PValue, ?> transform) {
     return borrowDataset(getInput(transform));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/9f143504/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index e90b490..7abf5be 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -22,12 +22,18 @@ import static 
com.google.common.base.Preconditions.checkState;
 import static 
org.apache.beam.runners.spark.translation.TranslationUtils.rejectSplittable;
 import static 
org.apache.beam.runners.spark.translation.TranslationUtils.rejectStateAndTimers;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.aggregators.SparkAggregators;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.io.ConsoleIO;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.runners.spark.io.SparkUnboundedSource;
@@ -48,6 +54,7 @@ import 
org.apache.beam.runners.spark.translation.SparkRuntimeContext;
 import org.apache.beam.runners.spark.translation.TransformEvaluator;
 import org.apache.beam.runners.spark.translation.TranslationUtils;
 import org.apache.beam.runners.spark.translation.WindowingHelpers;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
 import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -60,8 +67,6 @@ import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.CombineFnUtil;
@@ -76,10 +81,10 @@ import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.Durations;
 import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
 
 
 /**
@@ -116,13 +121,35 @@ final class StreamingTransformTranslator {
     };
   }
 
-  private static <T> TransformEvaluator<CreateStream.QueuedValues<T>> 
createFromQueue() {
-    return new TransformEvaluator<CreateStream.QueuedValues<T>>() {
+  private static <T> TransformEvaluator<CreateStream<T>> createFromQueue() {
+    return new TransformEvaluator<CreateStream<T>>() {
       @Override
-      public void evaluate(CreateStream.QueuedValues<T> transform, 
EvaluationContext context) {
-        Iterable<Iterable<T>> values = transform.getQueuedValues();
+      public void evaluate(CreateStream<T> transform, EvaluationContext 
context) {
         Coder<T> coder = context.getOutput(transform).getCoder();
-        context.putUnboundedDatasetFromQueue(transform, values, coder);
+        JavaStreamingContext jssc = context.getStreamingContext();
+        Queue<Iterable<T>> values = transform.getBatches();
+        WindowedValue.ValueOnlyWindowedValueCoder<T> windowCoder =
+            WindowedValue.getValueOnlyCoder(coder);
+        // create the DStream from queue.
+        Queue<JavaRDD<WindowedValue<T>>> rddQueue = new 
LinkedBlockingQueue<>();
+        for (Iterable<T> v : values) {
+          Iterable<WindowedValue<T>> windowedValues =
+              Iterables.transform(v, 
WindowingHelpers.<T>windowValueFunction());
+          JavaRDD<WindowedValue<T>> rdd =
+              jssc.sparkContext()
+                  .parallelize(CoderHelpers.toByteArrays(windowedValues, 
windowCoder))
+                  .map(CoderHelpers.fromByteFunction(windowCoder));
+          rddQueue.offer(rdd);
+        }
+
+        JavaInputDStream<WindowedValue<T>> inputDStream = 
jssc.queueStream(rddQueue, true);
+        UnboundedDataset<T> unboundedDataset = new UnboundedDataset<T>(
+            inputDStream, 
Collections.singletonList(inputDStream.inputDStream().id()));
+        // add pre-baked Watermarks for the pre-baked batches.
+        Queue<GlobalWatermarkHolder.SparkWatermarks> times = 
transform.getTimes();
+        GlobalWatermarkHolder.addAll(
+            ImmutableMap.of(unboundedDataset.getStreamSources().get(0), 
times));
+        context.putDataset(transform, unboundedDataset);
       }
     };
   }
@@ -136,7 +163,6 @@ final class StreamingTransformTranslator {
         // since this is a streaming pipeline, at least one of the 
PCollections to "flatten" are
         // unbounded, meaning it represents a DStream.
         // So we could end up with an unbounded unified DStream.
-        final List<JavaRDD<WindowedValue<T>>> rdds = new ArrayList<>();
         final List<JavaDStream<WindowedValue<T>>> dStreams = new ArrayList<>();
         final List<Integer> streamingSources = new ArrayList<>();
         for (TaggedPValue pv : pcs) {
@@ -152,27 +178,18 @@ final class StreamingTransformTranslator {
             streamingSources.addAll(unboundedDataset.getStreamSources());
             dStreams.add(unboundedDataset.getDStream());
           } else {
-            rdds.add(((BoundedDataset<T>) dataset).getRDD());
+            // create a single RDD stream.
+            Queue<JavaRDD<WindowedValue<T>>> q = new LinkedBlockingQueue<>();
+            q.offer(((BoundedDataset) dataset).getRDD());
+            //TODO: this is not recoverable from checkpoint!
+            JavaDStream<WindowedValue<T>> dStream = 
context.getStreamingContext().queueStream(q);
+            dStreams.add(dStream);
           }
         }
         // start by unifying streams into a single stream.
         JavaDStream<WindowedValue<T>> unifiedStreams =
             context.getStreamingContext().union(dStreams.remove(0), dStreams);
-        // now unify in RDDs.
-        if (rdds.size() > 0) {
-          JavaDStream<WindowedValue<T>> joined =
-              unifiedStreams.transform(
-                  new Function<JavaRDD<WindowedValue<T>>, 
JavaRDD<WindowedValue<T>>>() {
-                    @Override
-                    public JavaRDD<WindowedValue<T>> 
call(JavaRDD<WindowedValue<T>> streamRdd)
-                        throws Exception {
-                      return new 
JavaSparkContext(streamRdd.context()).union(streamRdd, rdds);
-                    }
-                  });
-          context.putDataset(transform, new UnboundedDataset<>(joined, 
streamingSources));
-        } else {
-          context.putDataset(transform, new UnboundedDataset<>(unifiedStreams, 
streamingSources));
-        }
+        context.putDataset(transform, new UnboundedDataset<>(unifiedStreams, 
streamingSources));
       }
     };
   }
@@ -182,42 +199,24 @@ final class StreamingTransformTranslator {
       @Override
       public void evaluate(final Window.Bound<T> transform, EvaluationContext 
context) {
         @SuppressWarnings("unchecked")
-        WindowFn<? super T, W> windowFn = (WindowFn<? super T, W>) 
transform.getWindowFn();
-        @SuppressWarnings("unchecked")
         UnboundedDataset<T> unboundedDataset =
             ((UnboundedDataset<T>) context.borrowDataset(transform));
         JavaDStream<WindowedValue<T>> dStream = unboundedDataset.getDStream();
-        // get the right window durations.
-        Duration windowDuration;
-        Duration slideDuration;
-        if (windowFn instanceof FixedWindows) {
-          windowDuration = Durations.milliseconds(((FixedWindows) 
windowFn).getSize().getMillis());
-          slideDuration = windowDuration;
-        } else if (windowFn instanceof SlidingWindows) {
-          SlidingWindows slidingWindows = (SlidingWindows) windowFn;
-          windowDuration = 
Durations.milliseconds(slidingWindows.getSize().getMillis());
-          slideDuration = 
Durations.milliseconds(slidingWindows.getPeriod().getMillis());
-        } else {
-          throw new UnsupportedOperationException(String.format("WindowFn %s 
is not supported.",
-              windowFn.getClass().getCanonicalName()));
-        }
-        JavaDStream<WindowedValue<T>> windowedDStream =
-            dStream.window(windowDuration, slideDuration);
-        //--- then we apply windowing to the elements
+        JavaDStream<WindowedValue<T>> outputStream;
         if (TranslationUtils.skipAssignWindows(transform, context)) {
-          context.putDataset(transform,
-              new UnboundedDataset<>(windowedDStream, 
unboundedDataset.getStreamSources()));
+          // do nothing.
+          outputStream = dStream;
         } else {
-          JavaDStream<WindowedValue<T>> outStream = windowedDStream.transform(
+          outputStream = dStream.transform(
               new Function<JavaRDD<WindowedValue<T>>, 
JavaRDD<WindowedValue<T>>>() {
             @Override
             public JavaRDD<WindowedValue<T>> call(JavaRDD<WindowedValue<T>> 
rdd) throws Exception {
               return rdd.map(new 
SparkAssignWindowFn<>(transform.getWindowFn()));
             }
           });
-          context.putDataset(transform,
-              new UnboundedDataset<>(outStream, 
unboundedDataset.getStreamSources()));
         }
+        context.putDataset(transform,
+            new UnboundedDataset<>(outputStream, 
unboundedDataset.getStreamSources()));
       }
     };
   }
@@ -427,7 +426,7 @@ final class StreamingTransformTranslator {
     EVALUATORS.put(ParDo.Bound.class, parDo());
     EVALUATORS.put(ParDo.BoundMulti.class, multiDo());
     EVALUATORS.put(ConsoleIO.Write.Unbound.class, print());
-    EVALUATORS.put(CreateStream.QueuedValues.class, createFromQueue());
+    EVALUATORS.put(CreateStream.class, createFromQueue());
     EVALUATORS.put(Window.Bound.class, window());
     EVALUATORS.put(Flatten.FlattenPCollectionList.class, flattenPColl());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/9f143504/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
index 8624f41..d7f3f34 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
@@ -18,22 +18,12 @@
 
 package org.apache.beam.runners.spark.translation.streaming;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.annotation.Nullable;
-import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.translation.Dataset;
-import org.apache.beam.runners.spark.translation.WindowingHelpers;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,14 +34,8 @@ import org.slf4j.LoggerFactory;
 public class UnboundedDataset<T> implements Dataset {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(UnboundedDataset.class);
-  private static final AtomicInteger queuedStreamIds = new AtomicInteger();
 
-  // only set if creating a DStream from a static collection
-  @Nullable private transient JavaStreamingContext jssc;
-
-  private Iterable<Iterable<T>> values;
-  private Coder<T> coder;
-  private JavaDStream<WindowedValue<T>> dStream;
+  private final JavaDStream<WindowedValue<T>> dStream;
   // points to the input streams that created this UnboundedDataset,
   // should be greater > 1 in case of Flatten for example.
   // when using GlobalWatermarkHolder this information helps to take only the 
relevant watermarks
@@ -63,41 +47,11 @@ public class UnboundedDataset<T> implements Dataset {
     this.streamSources.addAll(streamSources);
   }
 
-  public UnboundedDataset(Iterable<Iterable<T>> values, JavaStreamingContext 
jssc, Coder<T> coder) {
-    this.values = values;
-    this.jssc = jssc;
-    this.coder = coder;
-    // QueuedStream will have a negative (decreasing) unique id.
-    this.streamSources.add(queuedStreamIds.decrementAndGet());
-  }
-
-  @VisibleForTesting
-  public static void resetQueuedStreamIds() {
-    queuedStreamIds.set(0);
-  }
-
-  @SuppressWarnings("ConstantConditions")
   JavaDStream<WindowedValue<T>> getDStream() {
-    if (dStream == null) {
-      WindowedValue.ValueOnlyWindowedValueCoder<T> windowCoder =
-          WindowedValue.getValueOnlyCoder(coder);
-      // create the DStream from queue
-      Queue<JavaRDD<WindowedValue<T>>> rddQueue = new LinkedBlockingQueue<>();
-      for (Iterable<T> v : values) {
-        Iterable<WindowedValue<T>> windowedValues =
-            Iterables.transform(v, WindowingHelpers.<T>windowValueFunction());
-        JavaRDD<WindowedValue<T>> rdd = jssc.sc().parallelize(
-            CoderHelpers.toByteArrays(windowedValues, windowCoder)).map(
-            CoderHelpers.fromByteFunction(windowCoder));
-        rddQueue.offer(rdd);
-      }
-      // create DStream from queue, one at a time.
-      dStream = jssc.queueStream(rddQueue, true);
-    }
     return dStream;
   }
 
-  public List<Integer> getStreamSources() {
+  List<Integer> getStreamSources() {
     return streamSources;
   }
 

Reply via email to