This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 57ce2d180f94de28f718f355a8a5bc69f940b3be Author: Alexey Romanenko <[email protected]> AuthorDate: Mon Dec 10 10:52:19 2018 +0100 Use Iterators.transform() to return Iterable --- .../translation/batch/GroupByKeyTranslatorBatch.java | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java index 7f2d7fa..0ff0750 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java @@ -17,9 +17,7 @@ */ package org.apache.beam.runners.spark.structuredstreaming.translation.batch; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import java.util.List; +import com.google.common.collect.Iterators; import org.apache.beam.runners.spark.structuredstreaming.translation.EncoderHelpers; import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator; import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; @@ -54,14 +52,7 @@ class GroupByKeyTranslatorBatch<K, V> Dataset<KV<K, Iterable<V>>> materialized = grouped.mapGroups( (MapGroupsFunction<K, KV<K, V>, KV<K, Iterable<V>>>) - (key, iterator) -> { - // TODO: can we use here just "Iterable<V> iterable = () -> iterator;" ? - List<V> values = Lists.newArrayList(); - while (iterator.hasNext()) { - values.add(iterator.next().getValue()); - } - return KV.of(key, Iterables.unmodifiableIterable(values)); - }, + (key, iterator) -> KV.of(key, () -> Iterators.transform(iterator, KV::getValue)), EncoderHelpers.encoder()); Dataset<WindowedValue<KV<K, Iterable<V>>>> output =
