Handle QueuedStream (for testing) and track sources upstream.

Refactor according to changes.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/705695eb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/705695eb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/705695eb

Branch: refs/heads/master
Commit: 705695eb726acf086915e610cb2304bd968e3682
Parents: fa31f18
Author: Sela <[email protected]>
Authored: Sun Feb 12 18:32:06 2017 +0200
Committer: Sela <[email protected]>
Committed: Mon Feb 20 11:30:14 2017 +0200

----------------------------------------------------------------------
 .../beam/runners/spark/TestSparkRunner.java     | 11 ++-
 .../aggregators/AggregatorsAccumulator.java     |  2 +-
 .../spark/translation/SparkContextFactory.java  |  2 +-
 .../streaming/StreamingTransformTranslator.java | 92 +++++++++++++-------
 .../translation/streaming/UnboundedDataset.java | 29 ++++--
 5 files changed, 88 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/705695eb/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index e770164..a634dd4 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -22,6 +22,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 
 import org.apache.beam.runners.core.UnboundedReadFromBoundedSource;
+import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
 import org.apache.beam.runners.spark.metrics.SparkMetricsContainer;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult.State;
@@ -107,13 +109,14 @@ public final class TestSparkRunner extends 
PipelineRunner<SparkPipelineResult> {
   @Override
   public SparkPipelineResult run(Pipeline pipeline) {
     TestPipelineOptions testPipelineOptions = 
pipeline.getOptions().as(TestPipelineOptions.class);
-
-    // clear metrics singleton
-    SparkMetricsContainer.clear();
-
     SparkPipelineResult result = delegate.run(pipeline);
     result.waitUntilFinish();
 
+    // clear state of Aggregators, Metrics and Watermarks.
+    AggregatorsAccumulator.clear();
+    SparkMetricsContainer.clear();
+    GlobalWatermarkHolder.clear();
+
     // make sure the test pipeline finished successfully.
     State resultState = result.getState();
     assertThat(

http://git-wip-us.apache.org/repos/asf/beam/blob/705695eb/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java
index 1b49e91..a4dfda6 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java
@@ -89,7 +89,7 @@ public class AggregatorsAccumulator {
   }
 
   @VisibleForTesting
-  static void clear() {
+  public static void clear() {
     synchronized (AggregatorsAccumulator.class) {
       instance = null;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/705695eb/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
index 326838a..cdeddad 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
@@ -38,7 +38,7 @@ public final class SparkContextFactory {
    * {@code true} then the Spark context will be reused for beam pipelines.
    * This property should only be enabled for tests.
    */
-  static final String TEST_REUSE_SPARK_CONTEXT = 
"beam.spark.test.reuseSparkContext";
+  public static final String TEST_REUSE_SPARK_CONTEXT = 
"beam.spark.test.reuseSparkContext";
 
   // Spark allows only one context for JVM so this can be static.
   private static JavaSparkContext sparkContext;

http://git-wip-us.apache.org/repos/asf/beam/blob/705695eb/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index a49b959..9451df7 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -105,9 +105,12 @@ final class StreamingTransformTranslator {
     return new TransformEvaluator<Read.Unbounded<T>>() {
       @Override
       public void evaluate(Read.Unbounded<T> transform, EvaluationContext 
context) {
-        context.putDataset(transform,
-            new 
UnboundedDataset<>(SparkUnboundedSource.read(context.getStreamingContext(),
-                context.getRuntimeContext(), transform.getSource())));
+        context.putDataset(
+            transform,
+            SparkUnboundedSource.read(
+                context.getStreamingContext(),
+                context.getRuntimeContext(),
+                transform.getSource()));
       }
     };
   }
@@ -134,6 +137,7 @@ final class StreamingTransformTranslator {
         // So we could end up with an unbounded unified DStream.
         final List<JavaRDD<WindowedValue<T>>> rdds = new ArrayList<>();
         final List<JavaDStream<WindowedValue<T>>> dStreams = new ArrayList<>();
+        final List<Integer> streamingSources = new ArrayList<>();
         for (TaggedPValue pv : pcs) {
           checkArgument(
               pv.getValue() instanceof PCollection,
@@ -143,7 +147,9 @@ final class StreamingTransformTranslator {
           PCollection<T> pcol = (PCollection<T>) pv.getValue();
           Dataset dataset = context.borrowDataset(pcol);
           if (dataset instanceof UnboundedDataset) {
-            dStreams.add(((UnboundedDataset<T>) dataset).getDStream());
+            UnboundedDataset<T> unboundedDataset = (UnboundedDataset<T>) 
dataset;
+            streamingSources.addAll(unboundedDataset.getStreamingSources());
+            dStreams.add(unboundedDataset.getDStream());
           } else {
             rdds.add(((BoundedDataset<T>) dataset).getRDD());
           }
@@ -162,9 +168,9 @@ final class StreamingTransformTranslator {
                       return new 
JavaSparkContext(streamRdd.context()).union(streamRdd, rdds);
                     }
                   });
-          context.putDataset(transform, new UnboundedDataset<>(joined));
+          context.putDataset(transform, new UnboundedDataset<>(joined, 
streamingSources));
         } else {
-          context.putDataset(transform, new 
UnboundedDataset<>(unifiedStreams));
+          context.putDataset(transform, new UnboundedDataset<>(unifiedStreams, 
streamingSources));
         }
       }
     };
@@ -177,8 +183,9 @@ final class StreamingTransformTranslator {
         @SuppressWarnings("unchecked")
         WindowFn<? super T, W> windowFn = (WindowFn<? super T, W>) 
transform.getWindowFn();
         @SuppressWarnings("unchecked")
-        JavaDStream<WindowedValue<T>> dStream =
-            ((UnboundedDataset<T>) 
context.borrowDataset(transform)).getDStream();
+        UnboundedDataset<T> unboundedDataset =
+            ((UnboundedDataset<T>) context.borrowDataset(transform));
+        JavaDStream<WindowedValue<T>> dStream = unboundedDataset.getDStream();
         // get the right window durations.
         Duration windowDuration;
         Duration slideDuration;
@@ -197,7 +204,8 @@ final class StreamingTransformTranslator {
             dStream.window(windowDuration, slideDuration);
         //--- then we apply windowing to the elements
         if (TranslationUtils.skipAssignWindows(transform, context)) {
-          context.putDataset(transform, new 
UnboundedDataset<>(windowedDStream));
+          context.putDataset(transform,
+              new UnboundedDataset<>(windowedDStream, 
unboundedDataset.getStreamingSources()));
         } else {
           JavaDStream<WindowedValue<T>> outStream = windowedDStream.transform(
               new Function<JavaRDD<WindowedValue<T>>, 
JavaRDD<WindowedValue<T>>>() {
@@ -206,7 +214,8 @@ final class StreamingTransformTranslator {
               return rdd.map(new 
SparkAssignWindowFn<>(transform.getWindowFn()));
             }
           });
-          context.putDataset(transform, new UnboundedDataset<>(outStream));
+          context.putDataset(transform,
+              new UnboundedDataset<>(outStream, 
unboundedDataset.getStreamingSources()));
         }
       }
     };
@@ -217,8 +226,9 @@ final class StreamingTransformTranslator {
       @Override
       public void evaluate(GroupByKey<K, V> transform, EvaluationContext 
context) {
         @SuppressWarnings("unchecked")
-        JavaDStream<WindowedValue<KV<K, V>>> dStream =
-            ((UnboundedDataset<KV<K, V>>) 
context.borrowDataset(transform)).getDStream();
+        UnboundedDataset<KV<K, V>> unboundedDataset =
+            ((UnboundedDataset<KV<K, V>>) context.borrowDataset(transform));
+        JavaDStream<WindowedValue<KV<K, V>>> dStream = 
unboundedDataset.getDStream();
 
         @SuppressWarnings("unchecked")
         final KvCoder<K, V> coder = (KvCoder<K, V>) 
context.getInput(transform).getCoder();
@@ -239,7 +249,8 @@ final class StreamingTransformTranslator {
                 windowingStrategy);
           }
         });
-        context.putDataset(transform, new UnboundedDataset<>(outStream));
+        context.putDataset(transform,
+            new UnboundedDataset<>(outStream, 
unboundedDataset.getStreamingSources()));
       }
     };
   }
@@ -247,7 +258,6 @@ final class StreamingTransformTranslator {
   private static <K, InputT, OutputT> 
TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>>
   combineGrouped() {
     return new TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>>() 
{
-      @SuppressWarnings("unchecked")
       @Override
       public void evaluate(final Combine.GroupedValues<K, InputT, OutputT> 
transform,
                            EvaluationContext context) {
@@ -255,13 +265,15 @@ final class StreamingTransformTranslator {
         PCollection<? extends KV<K, ? extends Iterable<InputT>>> input =
             context.getInput(transform);
         final WindowingStrategy<?, ?> windowingStrategy = 
input.getWindowingStrategy();
+        @SuppressWarnings("unchecked")
         final CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, 
OutputT> fn =
             (CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, 
OutputT>)
                 CombineFnUtil.toFnWithContext(transform.getFn());
 
-        JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>> dStream =
-            ((UnboundedDataset<KV<K, Iterable<InputT>>>) 
context.borrowDataset(transform))
-                .getDStream();
+        @SuppressWarnings("unchecked")
+        UnboundedDataset<KV<K, Iterable<InputT>>> unboundedDataset =
+            ((UnboundedDataset<KV<K, Iterable<InputT>>>) 
context.borrowDataset(transform));
+        JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>> dStream = 
unboundedDataset.getDStream();
 
         final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
         final SparkPCollectionView pviews = context.getPViews();
@@ -283,7 +295,8 @@ final class StreamingTransformTranslator {
                   }
                 });
 
-        context.putDataset(transform, new UnboundedDataset<>(outStream));
+        context.putDataset(transform,
+            new UnboundedDataset<>(outStream, 
unboundedDataset.getStreamingSources()));
       }
     };
   }
@@ -292,7 +305,6 @@ final class StreamingTransformTranslator {
   combineGlobally() {
     return new TransformEvaluator<Combine.Globally<InputT, OutputT>>() {
 
-      @SuppressWarnings("unchecked")
       @Override
       public void evaluate(
           final Combine.Globally<InputT, OutputT> transform,
@@ -301,6 +313,7 @@ final class StreamingTransformTranslator {
         // serializable arguments to pass.
         final Coder<InputT> iCoder = context.getInput(transform).getCoder();
         final Coder<OutputT> oCoder = context.getOutput(transform).getCoder();
+        @SuppressWarnings("unchecked")
         final CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> 
combineFn =
             (CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT>)
                 CombineFnUtil.toFnWithContext(transform.getFn());
@@ -309,8 +322,10 @@ final class StreamingTransformTranslator {
         final boolean hasDefault = transform.isInsertDefault();
         final SparkPCollectionView pviews = context.getPViews();
 
-        JavaDStream<WindowedValue<InputT>> dStream =
-            ((UnboundedDataset<InputT>) 
context.borrowDataset(transform)).getDStream();
+        @SuppressWarnings("unchecked")
+        UnboundedDataset<InputT> unboundedDataset =
+            ((UnboundedDataset<InputT>) context.borrowDataset(transform));
+        JavaDStream<WindowedValue<InputT>> dStream = 
unboundedDataset.getDStream();
 
         JavaDStream<WindowedValue<OutputT>> outStream = dStream.transform(
             new Function<JavaRDD<WindowedValue<InputT>>, 
JavaRDD<WindowedValue<OutputT>>>() {
@@ -326,7 +341,8 @@ final class StreamingTransformTranslator {
           }
         });
 
-        context.putDataset(transform, new UnboundedDataset<>(outStream));
+        context.putDataset(transform,
+            new UnboundedDataset<>(outStream, 
unboundedDataset.getStreamingSources()));
       }
     };
   }
@@ -334,7 +350,6 @@ final class StreamingTransformTranslator {
   private static <K, InputT, AccumT, OutputT>
   TransformEvaluator<Combine.PerKey<K, InputT, OutputT>> combinePerKey() {
     return new TransformEvaluator<Combine.PerKey<K, InputT, OutputT>>() {
-      @SuppressWarnings("unchecked")
       @Override
       public void evaluate(final Combine.PerKey<K, InputT, OutputT> transform,
                            final EvaluationContext context) {
@@ -342,6 +357,7 @@ final class StreamingTransformTranslator {
         // serializable arguments to pass.
         final KvCoder<K, InputT> inputCoder =
             (KvCoder<K, InputT>) context.getInput(transform).getCoder();
+        @SuppressWarnings("unchecked")
         final CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, 
OutputT> combineFn =
             (CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, 
OutputT>)
                 CombineFnUtil.toFnWithContext(transform.getFn());
@@ -349,8 +365,10 @@ final class StreamingTransformTranslator {
         final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
         final SparkPCollectionView pviews = context.getPViews();
 
-        JavaDStream<WindowedValue<KV<K, InputT>>> dStream =
-            ((UnboundedDataset<KV<K, InputT>>) 
context.borrowDataset(transform)).getDStream();
+        @SuppressWarnings("unchecked")
+        UnboundedDataset<KV<K, InputT>> unboundedDataset =
+            ((UnboundedDataset<KV<K, InputT>>) 
context.borrowDataset(transform));
+        JavaDStream<WindowedValue<KV<K, InputT>>> dStream = 
unboundedDataset.getDStream();
 
         JavaDStream<WindowedValue<KV<K, OutputT>>> outStream =
             dStream.transform(new Function<JavaRDD<WindowedValue<KV<K, 
InputT>>>,
@@ -366,14 +384,14 @@ final class StreamingTransformTranslator {
                 windowingStrategy, sideInputs);
           }
         });
-        context.putDataset(transform, new UnboundedDataset<>(outStream));
+        context.putDataset(transform,
+            new UnboundedDataset<>(outStream, 
unboundedDataset.getStreamingSources()));
       }
     };
   }
 
   private static <InputT, OutputT> TransformEvaluator<ParDo.Bound<InputT, 
OutputT>> parDo() {
     return new TransformEvaluator<ParDo.Bound<InputT, OutputT>>() {
-      @SuppressWarnings("unchecked")
       @Override
       public void evaluate(final ParDo.Bound<InputT, OutputT> transform,
                            final EvaluationContext context) {
@@ -384,8 +402,11 @@ final class StreamingTransformTranslator {
         final WindowingStrategy<?, ?> windowingStrategy =
             context.getInput(transform).getWindowingStrategy();
         final SparkPCollectionView pviews = context.getPViews();
-        JavaDStream<WindowedValue<InputT>> dStream =
-            ((UnboundedDataset<InputT>) 
context.borrowDataset(transform)).getDStream();
+
+        @SuppressWarnings("unchecked")
+        UnboundedDataset<InputT> unboundedDataset =
+            ((UnboundedDataset<InputT>) context.borrowDataset(transform));
+        JavaDStream<WindowedValue<InputT>> dStream = 
unboundedDataset.getDStream();
 
         final String stepName = context.getCurrentTransform().getFullName();
 
@@ -409,7 +430,8 @@ final class StreamingTransformTranslator {
           }
         });
 
-        context.putDataset(transform, new UnboundedDataset<>(outStream));
+        context.putDataset(transform,
+            new UnboundedDataset<>(outStream, 
unboundedDataset.getStreamingSources()));
       }
     };
   }
@@ -427,9 +449,12 @@ final class StreamingTransformTranslator {
         final SparkPCollectionView pviews = context.getPViews();
         final WindowingStrategy<?, ?> windowingStrategy =
             context.getInput(transform).getWindowingStrategy();
+
         @SuppressWarnings("unchecked")
-        JavaDStream<WindowedValue<InputT>> dStream =
-            ((UnboundedDataset<InputT>) 
context.borrowDataset(transform)).getDStream();
+        UnboundedDataset<InputT> unboundedDataset =
+            ((UnboundedDataset<InputT>) context.borrowDataset(transform));
+        JavaDStream<WindowedValue<InputT>> dStream = 
unboundedDataset.getDStream();
+
         JavaPairDStream<TupleTag<?>, WindowedValue<?>> all = 
dStream.transformToPair(
             new Function<JavaRDD<WindowedValue<InputT>>,
                 JavaPairRDD<TupleTag<?>, WindowedValue<?>>>() {
@@ -460,7 +485,8 @@ final class StreamingTransformTranslator {
           JavaDStream<WindowedValue<Object>> values =
               (JavaDStream<WindowedValue<Object>>)
                   (JavaDStream<?>) TranslationUtils.dStreamValues(filtered);
-          context.putDataset(e.getValue(), new UnboundedDataset<>(values));
+          context.putDataset(e.getValue(),
+              new UnboundedDataset<>(values, 
unboundedDataset.getStreamingSources()));
         }
       }
     };

http://git-wip-us.apache.org/repos/asf/beam/blob/705695eb/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
index d059c7e..80c0515 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
@@ -19,8 +19,12 @@
 package org.apache.beam.runners.spark.translation.streaming;
 
 import com.google.common.collect.Iterables;
+
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.translation.Dataset;
@@ -41,6 +45,7 @@ import org.slf4j.LoggerFactory;
 public class UnboundedDataset<T> implements Dataset {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(UnboundedDataset.class);
+  private static final AtomicInteger queuedStreamIds = new AtomicInteger();
 
   // only set if creating a DStream from a static collection
   @Nullable private transient JavaStreamingContext jssc;
@@ -48,15 +53,23 @@ public class UnboundedDataset<T> implements Dataset {
   private Iterable<Iterable<T>> values;
   private Coder<T> coder;
   private JavaDStream<WindowedValue<T>> dStream;
+  // points to the input streams that created this UnboundedDataset,
+  // should be greater > 1 in case of Flatten for example.
+  // when using GlobalWatermarkHolder this information helps to take only the 
relevant watermarks
+  // and reason about them accordingly.
+  private final List<Integer> streamingSources = new ArrayList<>();
 
-  UnboundedDataset(JavaDStream<WindowedValue<T>> dStream) {
+  public UnboundedDataset(JavaDStream<WindowedValue<T>> dStream, List<Integer> 
streamingSources) {
     this.dStream = dStream;
+    this.streamingSources.addAll(streamingSources);
   }
 
   public UnboundedDataset(Iterable<Iterable<T>> values, JavaStreamingContext 
jssc, Coder<T> coder) {
     this.values = values;
     this.jssc = jssc;
     this.coder = coder;
+    // QueuedStream will have a negative (decreasing) unique id.
+    this.streamingSources.add(queuedStreamIds.decrementAndGet());
   }
 
   @SuppressWarnings("ConstantConditions")
@@ -66,7 +79,6 @@ public class UnboundedDataset<T> implements Dataset {
           WindowedValue.getValueOnlyCoder(coder);
       // create the DStream from queue
       Queue<JavaRDD<WindowedValue<T>>> rddQueue = new LinkedBlockingQueue<>();
-      JavaRDD<WindowedValue<T>> lastRDD = null;
       for (Iterable<T> v : values) {
         Iterable<WindowedValue<T>> windowedValues =
             Iterables.transform(v, WindowingHelpers.<T>windowValueFunction());
@@ -74,18 +86,17 @@ public class UnboundedDataset<T> implements Dataset {
             CoderHelpers.toByteArrays(windowedValues, windowCoder)).map(
             CoderHelpers.fromByteFunction(windowCoder));
         rddQueue.offer(rdd);
-        lastRDD = rdd;
       }
-      // create DStream from queue, one at a time,
-      // with last as default in case batches repeat (graceful stops for 
example).
-      // if the stream is empty, avoid creating a default empty RDD.
-      // mainly for unit test so no reason to have this configurable.
-      dStream = lastRDD != null ? jssc.queueStream(rddQueue, true, lastRDD)
-          : jssc.queueStream(rddQueue, true);
+      // create DStream from queue, one at a time.
+      dStream = jssc.queueStream(rddQueue, true);
     }
     return dStream;
   }
 
+  public List<Integer> getStreamingSources() {
+    return streamingSources;
+  }
+
   public void cache() {
     dStream.cache();
   }

Reply via email to