Repository: beam Updated Branches: refs/heads/master 1fd52f53c -> c79bd95bd
[BEAM-1636] UnboundedDataset action() does not materialize RDD Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a889597e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a889597e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a889597e Branch: refs/heads/master Commit: a889597e748eb752141af8dc568c56449c4eba5c Parents: 1fd52f5 Author: Aviem Zur <[email protected]> Authored: Tue Mar 7 15:07:03 2017 +0200 Committer: Aviem Zur <[email protected]> Committed: Tue Mar 7 15:07:03 2017 +0200 ---------------------------------------------------------------------- .../beam/runners/spark/translation/BoundedDataset.java | 7 +------ .../beam/runners/spark/translation/TranslationUtils.java | 8 ++++++++ .../spark/translation/streaming/UnboundedDataset.java | 11 ++++++++++- 3 files changed, 19 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a889597e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java index 7db04a8..6e4ffc7 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java @@ -32,7 +32,6 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDDLike; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.storage.StorageLevel; /** @@ -106,11 +105,7 @@ public class BoundedDataset<T> implements Dataset { @Override public void action() { // Empty function to force computation of RDD. - rdd.foreach(new VoidFunction<WindowedValue<T>>() { - @Override public void call(WindowedValue<T> tWindowedValue) throws Exception { - // Empty implementation. - } - }); + rdd.foreach(TranslationUtils.<WindowedValue<T>>emptyVoidFunction()); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/a889597e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java index f2b3418..8545b36 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java @@ -43,6 +43,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; @@ -267,4 +268,11 @@ public final class TranslationUtils { } } + public static <T> VoidFunction<T> emptyVoidFunction() { + return new VoidFunction<T>() { + @Override public void call(T t) throws Exception { + // Empty implementation. + } + }; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/a889597e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java index e9abe93..ccdaf11 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java @@ -21,7 +21,10 @@ package org.apache.beam.runners.spark.translation.streaming; import java.util.ArrayList; import java.util.List; import org.apache.beam.runners.spark.translation.Dataset; +import org.apache.beam.runners.spark.translation.TranslationUtils; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.api.java.JavaDStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,11 +71,17 @@ public class UnboundedDataset<T> implements Dataset { @Override public void action() { // Force computation of DStream. - dStream.dstream().register(); + dStream.foreachRDD(new VoidFunction<JavaRDD<WindowedValue<T>>>() { + @Override + public void call(JavaRDD<WindowedValue<T>> rdd) throws Exception { + rdd.foreach(TranslationUtils.<WindowedValue<T>>emptyVoidFunction()); + } + }); } @Override public void setName(String name) { // ignore } + }
