This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push: new c0be696 Fix kryo issue in GBK translator with a workaround c0be696 is described below commit c0be696c50be0468f2a82d7720e3004dbff30ead Author: Alexey Romanenko <aromanenko....@gmail.com> AuthorDate: Wed Feb 6 18:53:40 2019 +0100 Fix kryo issue in GBK translator with a workaround --- .../batch/GroupByKeyTranslatorBatch.java | 25 +++++++++++++++------- .../translation/batch/GroupByKeyTest.java | 4 ---- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java index 9ecda56..3626181 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java @@ -17,7 +17,8 @@ */ package org.apache.beam.runners.spark.structuredstreaming.translation.batch; -import com.google.common.collect.Iterators; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import org.apache.beam.runners.spark.structuredstreaming.translation.EncoderHelpers; import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator; import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; @@ -30,6 +31,8 @@ import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.KeyValueGroupedDataset; +import java.util.List; + class GroupByKeyTranslatorBatch<K, V> implements TransformTranslator< PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>> { @@ -41,24 +44,30 @@ class GroupByKeyTranslatorBatch<K, V> Dataset<WindowedValue<KV<K, V>>> input = context.getDataset(context.getInput()); + // Extract key to group by key only. KeyValueGroupedDataset<K, KV<K, V>> grouped = input - // extact KV from WindowedValue .map( (MapFunction<WindowedValue<KV<K, V>>, KV<K, V>>) WindowedValue::getValue, EncoderHelpers.kvEncoder()) - // apply the actual GBK providing a way to extract the K - .groupByKey((MapFunction<KV<K, V>, K>) KV::getKey, EncoderHelpers.<K>genericEncoder()); + .groupByKey((MapFunction<KV<K, V>, K>) KV::getKey, EncoderHelpers.genericEncoder()); + // Materialize grouped values, potential OOM because of creation of new iterable Dataset<KV<K, Iterable<V>>> materialized = - // create KV<K, Iterable<V>> grouped.mapGroups( (MapGroupsFunction<K, KV<K, V>, KV<K, Iterable<V>>>) - (key, iterator) -> KV.of(key, () -> Iterators.transform(iterator, KV::getValue)), + // TODO: We need to improve this part and avoid creating of new List (potential OOM) + // (key, iterator) -> KV.of(key, () -> Iterators.transform(iterator, KV::getValue)), + (key, iterator) -> { + List<V> values = Lists.newArrayList(); + while (iterator.hasNext()) { + values.add(iterator.next().getValue()); + } + return KV.of(key, Iterables.unmodifiableIterable(values)); + }, EncoderHelpers.kvEncoder()); - // wrap inside a WindowedValue - //TODO fix: serialization issue + // Window the result into global window. Dataset<WindowedValue<KV<K, Iterable<V>>>> output = materialized.map( (MapFunction<KV<K, Iterable<V>>, WindowedValue<KV<K, Iterable<V>>>>) diff --git a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java index 58a14dc..a069534 100644 --- a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java +++ b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java @@ -48,10 +48,6 @@ public class GroupByKeyTest implements Serializable { pipeline = Pipeline.create(options); } - @Ignore( - "fails with Unable to create serializer " - + "\"com.esotericsoftware.kryo.serializers.FieldSerializer\" for class: " - + "worker.org.gradle.internal.UncheckedException in last map step") @Test public void testGroupByKey() { Map<Integer, Integer> elems = new HashMap<>();