This is an automated email from the ASF dual-hosted git repository.

aromanenko pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to 
refs/heads/spark-runner_structured-streaming by this push:
     new c0be696  Fix kryo issue in GBK translator with a workaround
c0be696 is described below

commit c0be696c50be0468f2a82d7720e3004dbff30ead
Author: Alexey Romanenko <aromanenko....@gmail.com>
AuthorDate: Wed Feb 6 18:53:40 2019 +0100

    Fix kryo issue in GBK translator with a workaround
---
 .../batch/GroupByKeyTranslatorBatch.java           | 25 +++++++++++++++-------
 .../translation/batch/GroupByKeyTest.java          |  4 ----
 2 files changed, 17 insertions(+), 12 deletions(-)

diff --git 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
index 9ecda56..3626181 100644
--- 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
+++ 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
@@ -17,7 +17,8 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
-import com.google.common.collect.Iterators;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.EncoderHelpers;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
@@ -30,6 +31,8 @@ import org.apache.spark.api.java.function.MapGroupsFunction;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.KeyValueGroupedDataset;
 
+import java.util.List;
+
 class GroupByKeyTranslatorBatch<K, V>
     implements TransformTranslator<
         PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>> {
@@ -41,24 +44,30 @@ class GroupByKeyTranslatorBatch<K, V>
 
     Dataset<WindowedValue<KV<K, V>>> input = 
context.getDataset(context.getInput());
 
+    // Extract key to group by key only.
     KeyValueGroupedDataset<K, KV<K, V>> grouped =
         input
-            // extact KV from WindowedValue
             .map(
                 (MapFunction<WindowedValue<KV<K, V>>, KV<K, V>>) 
WindowedValue::getValue,
                 EncoderHelpers.kvEncoder())
-            // apply the actual GBK providing a way to extract the K
-            .groupByKey((MapFunction<KV<K, V>, K>) KV::getKey, 
EncoderHelpers.<K>genericEncoder());
+            .groupByKey((MapFunction<KV<K, V>, K>) KV::getKey, 
EncoderHelpers.genericEncoder());
 
+    // Materialize grouped values, potential OOM because of creation of new 
iterable
     Dataset<KV<K, Iterable<V>>> materialized =
-        // create KV<K, Iterable<V>>
         grouped.mapGroups(
             (MapGroupsFunction<K, KV<K, V>, KV<K, Iterable<V>>>)
-                (key, iterator) -> KV.of(key, () -> 
Iterators.transform(iterator, KV::getValue)),
+                // TODO: We need to improve this part and avoid creating of 
new List (potential OOM)
+                // (key, iterator) -> KV.of(key, () -> 
Iterators.transform(iterator, KV::getValue)),
+                (key, iterator) -> {
+                  List<V> values = Lists.newArrayList();
+                  while (iterator.hasNext()) {
+                    values.add(iterator.next().getValue());
+                  }
+                  return KV.of(key, Iterables.unmodifiableIterable(values));
+                },
             EncoderHelpers.kvEncoder());
 
-    // wrap inside a WindowedValue
-    //TODO fix: serialization issue
+    // Window the result into global window.
     Dataset<WindowedValue<KV<K, Iterable<V>>>> output =
         materialized.map(
             (MapFunction<KV<K, Iterable<V>>, WindowedValue<KV<K, 
Iterable<V>>>>)
diff --git 
a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java
 
b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java
index 58a14dc..a069534 100644
--- 
a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java
+++ 
b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java
@@ -48,10 +48,6 @@ public class GroupByKeyTest implements Serializable {
     pipeline = Pipeline.create(options);
   }
 
-  @Ignore(
-      "fails with Unable to create serializer "
-          + "\"com.esotericsoftware.kryo.serializers.FieldSerializer\" for 
class: "
-          + "worker.org.gradle.internal.UncheckedException in last map step")
   @Test
   public void testGroupByKey() {
     Map<Integer, Integer> elems = new HashMap<>();

Reply via email to