Repository: beam
Updated Branches:
  refs/heads/master aa45ccb08 -> 4e5a762ef


[BEAM-1512] Optimize leaf transforms materialization


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a2f0615f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a2f0615f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a2f0615f

Branch: refs/heads/master
Commit: a2f0615f27315c3ee4cbe5f67e8c0018797a0e7f
Parents: aa45ccb
Author: Aviem Zur <[email protected]>
Authored: Sun Feb 19 19:52:22 2017 +0200
Committer: Sela <[email protected]>
Committed: Mon Feb 20 12:07:54 2017 +0200

----------------------------------------------------------------------
 .../beam/runners/spark/translation/BoundedDataset.java    | 10 +++++++++-
 .../spark/translation/streaming/UnboundedDataset.java     |  9 ++-------
 2 files changed, 11 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a2f0615f/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
index 1cfb0e0..5e19846 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.spark.translation;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
+import java.util.Iterator;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
@@ -32,6 +33,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaRDDLike;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.VoidFunction;
 import org.apache.spark.storage.StorageLevel;
 
 /**
@@ -104,7 +106,13 @@ public class BoundedDataset<T> implements Dataset {
 
   @Override
   public void action() {
-    rdd.count();
+    // Empty function to force computation of RDD.
+    rdd.foreachPartition(new VoidFunction<Iterator<WindowedValue<T>>>() {
+      @Override
+      public void call(Iterator<WindowedValue<T>> windowedValueIterator) 
throws Exception {
+        // Empty implementation.
+      }
+    });
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/a2f0615f/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
index 8b65dca..6f5fa93 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
@@ -32,7 +32,6 @@ import 
org.apache.beam.runners.spark.translation.WindowingHelpers;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.VoidFunction;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.slf4j.Logger;
@@ -115,12 +114,8 @@ public class UnboundedDataset<T> implements Dataset {
 
   @Override
   public void action() {
-    dStream.foreachRDD(new VoidFunction<JavaRDD<WindowedValue<T>>>() {
-      @Override
-      public void call(JavaRDD<WindowedValue<T>> rdd) throws Exception {
-        rdd.count();
-      }
-    });
+    // Force computation of DStream.
+    dStream.dstream().register();
   }
 
   @Override

Reply via email to