This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 9e6fc2cf8c4297996554473a19b2166254ee3f4d Author: Alexey Romanenko <[email protected]> AuthorDate: Fri Dec 7 10:54:12 2018 +0100 Add primitive GroupByKeyTranslatorBatch implementation --- ...KeyTranslatorBatch.java => EncoderHelpers.java} | 22 ++++------ .../translation/TranslationContext.java | 4 +- .../batch/GroupByKeyTranslatorBatch.java | 49 ++++++++++++++++++++-- 3 files changed, 56 insertions(+), 19 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EncoderHelpers.java similarity index 56% copy from runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java copy to runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EncoderHelpers.java index 4ee77fb..4c56922 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EncoderHelpers.java @@ -15,20 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.spark.structuredstreaming.translation.batch; +package org.apache.beam.runners.spark.structuredstreaming.translation; -import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator; -import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; -class GroupByKeyTranslatorBatch<K, InputT> - implements TransformTranslator< - PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>>> { +/** {@link Encoders} utility class. */ +public class EncoderHelpers { - @Override - public void translateTransform( - PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>> transform, - TranslationContext context) {} + @SuppressWarnings("unchecked") + public static <T> Encoder<T> encoder() { + return Encoders.kryo((Class<T>) Object.class); + } } diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java index 3c29867..e66bc90 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java @@ -46,9 +46,9 @@ import org.apache.spark.sql.streaming.StreamingQueryException; */ public class TranslationContext { - /** All the datasets of the DAG */ + /** All the datasets of the DAG. */ private final Map<PValue, Dataset<?>> datasets; - /** datasets that are not used as input to other datasets (leaves of the DAG) */ + /** datasets that are not used as input to other datasets (leaves of the DAG). */ private final Set<Dataset<?>> leaves; private final SparkPipelineOptions options; diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java index 4ee77fb..7f2d7fa 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java @@ -17,18 +17,59 @@ */ package org.apache.beam.runners.spark.structuredstreaming.translation.batch; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import java.util.List; +import org.apache.beam.runners.spark.structuredstreaming.translation.EncoderHelpers; import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator; import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.KeyValueGroupedDataset; -class GroupByKeyTranslatorBatch<K, InputT> +class GroupByKeyTranslatorBatch<K, V> implements TransformTranslator< - PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>>> { + PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>> { @Override public void translateTransform( - PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>> transform, - TranslationContext context) {} + PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> transform, + TranslationContext context) { + + Dataset<WindowedValue<KV<K, V>>> input = context.getDataset(context.getInput()); + + // group by key only. + KeyValueGroupedDataset<K, KV<K, V>> grouped = + input + .map( + (MapFunction<WindowedValue<KV<K, V>>, KV<K, V>>) WindowedValue::getValue, + EncoderHelpers.encoder()) + .groupByKey((MapFunction<KV<K, V>, K>) KV::getKey, EncoderHelpers.<K>encoder()); + + Dataset<KV<K, Iterable<V>>> materialized = + grouped.mapGroups( + (MapGroupsFunction<K, KV<K, V>, KV<K, Iterable<V>>>) + (key, iterator) -> { + // TODO: can we use here just "Iterable<V> iterable = () -> iterator;" ? + List<V> values = Lists.newArrayList(); + while (iterator.hasNext()) { + values.add(iterator.next().getValue()); + } + return KV.of(key, Iterables.unmodifiableIterable(values)); + }, + EncoderHelpers.encoder()); + + Dataset<WindowedValue<KV<K, Iterable<V>>>> output = + materialized.map( + (MapFunction<KV<K, Iterable<V>>, WindowedValue<KV<K, Iterable<V>>>>) + WindowedValue::valueInGlobalWindow, + EncoderHelpers.encoder()); + + context.putDataset(context.getOutput(), output); + } }
