Repository: incubator-beam
Updated Branches:
  refs/heads/master 215980ad3 -> 9c3e3e7a3


Directly implement ReifyTimestampsAndWindows in SparkRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/597e3955
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/597e3955
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/597e3955

Branch: refs/heads/master
Commit: 597e3955c219a7c50df124a0689b99b98dfbbbc9
Parents: 215980a
Author: Kenneth Knowles <[email protected]>
Authored: Thu Oct 27 22:18:19 2016 -0700
Committer: Sela <[email protected]>
Committed: Fri Oct 28 10:56:44 2016 +0300

----------------------------------------------------------------------
 .../translation/GroupCombineFunctions.java      |  5 +--
 .../ReifyTimestampsAndWindowsFunction.java      | 47 ++++++++++++++++++++
 2 files changed, 48 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/597e3955/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
index e2a0f87..421b1b0 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
@@ -36,7 +36,6 @@ import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.ReifyTimestampAndWindowsDoFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
@@ -48,7 +47,6 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
-
 import scala.Tuple2;
 
 
@@ -77,8 +75,7 @@ public class GroupCombineFunctions {
     // Use coders to convert objects in the PCollection to byte arrays, so they
     // can be transferred over the network for the shuffle.
     JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> groupedByKey =
-        rdd.mapPartitions(new DoFnFunction<KV<K, V>, KV<K, 
WindowedValue<V>>>(null,
-                new ReifyTimestampAndWindowsDoFn<K, V>(), runtimeContext, 
null, null))
+        rdd.map(new ReifyTimestampsAndWindowsFunction<K, V>())
             .map(WindowingHelpers.<KV<K, WindowedValue<V>>>unwindowFunction())
             .mapToPair(TranslationUtils.<K, WindowedValue<V>>toPairFunction())
             .mapToPair(CoderHelpers.toByteFunction(keyCoder, wvCoder))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/597e3955/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ReifyTimestampsAndWindowsFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ReifyTimestampsAndWindowsFunction.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ReifyTimestampsAndWindowsFunction.java
new file mode 100644
index 0000000..8281c17
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ReifyTimestampsAndWindowsFunction.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.translation;
+
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.spark.api.java.function.Function;
+
+/**
+ * Simple {@link Function} to bring the windowing information into the value 
from the implicit
+ * background representation of the {@link PCollection}.
+ */
+public class ReifyTimestampsAndWindowsFunction<K, V>
+    implements Function<WindowedValue<KV<K, V>>, WindowedValue<KV<K, 
WindowedValue<V>>>> {
+  @Override
+  public WindowedValue<KV<K, WindowedValue<V>>> call(WindowedValue<KV<K, V>> 
elem)
+      throws Exception {
+    return WindowedValue.of(
+        KV.of(
+            elem.getValue().getKey(),
+            WindowedValue.of(
+                elem.getValue().getValue(),
+                elem.getTimestamp(),
+                elem.getWindows(),
+                elem.getPane())),
+        elem.getTimestamp(),
+        elem.getWindows(),
+        elem.getPane());
+  }
+}

Reply via email to