Repository: beam Updated Branches: refs/heads/master aa45ccb08 -> 4e5a762ef
[BEAM-1512] Optimize leaf transforms materialization Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a2f0615f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a2f0615f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a2f0615f Branch: refs/heads/master Commit: a2f0615f27315c3ee4cbe5f67e8c0018797a0e7f Parents: aa45ccb Author: Aviem Zur <[email protected]> Authored: Sun Feb 19 19:52:22 2017 +0200 Committer: Sela <[email protected]> Committed: Mon Feb 20 12:07:54 2017 +0200 ---------------------------------------------------------------------- .../beam/runners/spark/translation/BoundedDataset.java | 10 +++++++++- .../spark/translation/streaming/UnboundedDataset.java | 9 ++------- 2 files changed, 11 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a2f0615f/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java index 1cfb0e0..5e19846 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java @@ -20,6 +20,7 @@ package org.apache.beam.runners.spark.translation; import com.google.common.base.Function; import com.google.common.collect.Iterables; +import java.util.Iterator; import java.util.List; import javax.annotation.Nullable; import org.apache.beam.runners.spark.coders.CoderHelpers; @@ -32,6 +33,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDDLike; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.storage.StorageLevel; /** @@ -104,7 +106,13 @@ public class BoundedDataset<T> implements Dataset { @Override public void action() { - rdd.count(); + // Empty function to force computation of RDD. + rdd.foreachPartition(new VoidFunction<Iterator<WindowedValue<T>>>() { + @Override + public void call(Iterator<WindowedValue<T>> windowedValueIterator) throws Exception { + // Empty implementation. + } + }); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/a2f0615f/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java index 8b65dca..6f5fa93 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java @@ -32,7 +32,6 @@ import org.apache.beam.runners.spark.translation.WindowingHelpers; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.util.WindowedValue; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.slf4j.Logger; @@ -115,12 +114,8 @@ public class UnboundedDataset<T> implements Dataset { @Override public void action() { - dStream.foreachRDD(new VoidFunction<JavaRDD<WindowedValue<T>>>() { - @Override - public void call(JavaRDD<WindowedValue<T>> rdd) throws Exception { - rdd.count(); - } - }); + // Force computation of DStream. + dStream.dstream().register(); } @Override
