Repository: incubator-beam
Updated Branches:
  refs/heads/master f20bf8afd -> 135cb733f


Materialize PCollection/RDD as windowed values with the appropriate windows.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1ca3b30d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1ca3b30d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1ca3b30d

Branch: refs/heads/master
Commit: 1ca3b30dd5679e75ce9e35dc08cc0012fb899186
Parents: d852c5b
Author: Sela <[email protected]>
Authored: Thu Apr 14 22:05:14 2016 +0300
Committer: Sela <[email protected]>
Committed: Tue Apr 19 22:06:15 2016 +0300

----------------------------------------------------------------------
 .../spark/translation/EvaluationContext.java    | 62 ++++++++++----------
 1 file changed, 30 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1ca3b30d/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index 78a62aa..531a6ce 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -37,6 +37,9 @@ import org.apache.beam.sdk.runners.AggregatorValues;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -76,12 +79,13 @@ public class EvaluationContext implements EvaluationResult {
    */
   private class RDDHolder<T> {
 
-    private Iterable<T> values;
+    private Iterable<WindowedValue<T>> windowedValues;
     private Coder<T> coder;
     private JavaRDDLike<WindowedValue<T>, ?> rdd;
 
     RDDHolder(Iterable<T> values, Coder<T> coder) {
-      this.values = values;
+      this.windowedValues =
+          Iterables.transform(values, 
WindowingHelpers.<T>windowValueFunction());
       this.coder = coder;
     }
 
@@ -91,14 +95,6 @@ public class EvaluationContext implements EvaluationResult {
 
     JavaRDDLike<WindowedValue<T>, ?> getRDD() {
       if (rdd == null) {
-        Iterable<WindowedValue<T>> windowedValues = Iterables.transform(values,
-            new Function<T, WindowedValue<T>>() {
-            @Override
-            public WindowedValue<T> apply(T t) {
-             // TODO: this is wrong if T is a TimestampedValue
-              return WindowedValue.valueInEmptyWindows(t);
-            }
-        });
         WindowedValue.ValueOnlyWindowedValueCoder<T> windowCoder =
             WindowedValue.getValueOnlyCoder(coder);
         rdd = jsc.parallelize(CoderHelpers.toByteArrays(windowedValues, 
windowCoder))
@@ -107,29 +103,31 @@ public class EvaluationContext implements 
EvaluationResult {
       return rdd;
     }
 
-    Iterable<T> getValues(PCollection<T> pcollection) {
-      if (values == null) {
-        coder = pcollection.getCoder();
-        JavaRDDLike<byte[], ?> bytesRDD = 
rdd.map(WindowingHelpers.<T>unwindowFunction())
-            .map(CoderHelpers.toByteFunction(coder));
+    Iterable<WindowedValue<T>> getValues(PCollection<T> pcollection) {
+      if (windowedValues == null) {
+        WindowFn<?, ?> windowFn =
+                pcollection.getWindowingStrategy().getWindowFn();
+        Coder<? extends BoundedWindow> windowCoder = windowFn.windowCoder();
+        final WindowedValue.WindowedValueCoder<T> windowedValueCoder;
+            if (windowFn instanceof GlobalWindows) {
+              windowedValueCoder =
+                  
WindowedValue.ValueOnlyWindowedValueCoder.of(pcollection.getCoder());
+            } else {
+              windowedValueCoder =
+                  
WindowedValue.FullWindowedValueCoder.of(pcollection.getCoder(), windowCoder);
+            }
+        JavaRDDLike<byte[], ?> bytesRDD =
+            rdd.map(CoderHelpers.toByteFunction(windowedValueCoder));
         List<byte[]> clientBytes = bytesRDD.collect();
-        values = Iterables.transform(clientBytes, new Function<byte[], T>() {
+        windowedValues = Iterables.transform(clientBytes,
+            new Function<byte[], WindowedValue<T>>() {
           @Override
-          public T apply(byte[] bytes) {
-            return CoderHelpers.fromByteArray(bytes, coder);
+          public WindowedValue<T> apply(byte[] bytes) {
+            return CoderHelpers.fromByteArray(bytes, windowedValueCoder);
           }
         });
       }
-      return values;
-    }
-
-    Iterable<WindowedValue<T>> getWindowedValues(PCollection<T> pcollection) {
-      return Iterables.transform(get(pcollection), new Function<T, 
WindowedValue<T>>() {
-        @Override
-        public WindowedValue<T> apply(T t) {
-          return WindowedValue.valueInEmptyWindows(t); // TODO: not the right 
place?
-        }
-      });
+      return windowedValues;
     }
   }
 
@@ -264,15 +262,15 @@ public class EvaluationContext implements 
EvaluationResult {
 
   @Override
   public <T> Iterable<T> get(PCollection<T> pcollection) {
-    @SuppressWarnings("unchecked")
-    RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection);
-    return rddHolder.getValues(pcollection);
+    @SuppressWarnings("unchecked") RDDHolder<T> rddHolder = (RDDHolder<T>) 
pcollections.get(pcollection);
+    Iterable<WindowedValue<T>> windowedValues = 
rddHolder.getValues(pcollection);
+    return Iterables.transform(windowedValues, 
WindowingHelpers.<T>unwindowValueFunction());
   }
 
   <T> Iterable<WindowedValue<T>> getWindowedValues(PCollection<T> pcollection) 
{
     @SuppressWarnings("unchecked")
     RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection);
-    return rddHolder.getWindowedValues(pcollection);
+    return rddHolder.getValues(pcollection);
   }
 
   @Override

Reply via email to