echauchot commented on code in PR #22446:
URL: https://github.com/apache/beam/pull/22446#discussion_r976241074
##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java:
##########
@@ -17,74 +17,264 @@
*/
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+import static
org.apache.beam.repackaged.core.org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.CoderHelpers.toByteArray;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.collectionEncoder;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.encoderOf;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.kvEncoder;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.windowedValueEncoder;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.concat;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.fun1;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.fun2;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.javaIterator;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.listOf;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.seqOf;
+import static org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING;
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.collect_list;
+import static org.apache.spark.sql.functions.explode;
+import static org.apache.spark.sql.functions.max;
+import static org.apache.spark.sql.functions.min;
+import static org.apache.spark.sql.functions.struct;
+
import java.io.Serializable;
import org.apache.beam.runners.core.InMemoryStateInternals;
-import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.ReduceFnRunner;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.SystemReduceFn;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.AbstractTranslationContext;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
import
org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.GroupAlsoByWindowViaOutputBufferFn;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.KeyValueGroupedDataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.TypedColumn;
+import org.apache.spark.sql.catalyst.expressions.CreateArray;
+import org.apache.spark.sql.catalyst.expressions.CreateNamedStruct;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.catalyst.expressions.Literal;
+import org.apache.spark.sql.catalyst.expressions.Literal$;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import scala.Tuple2;
+import scala.collection.Iterator;
+import scala.collection.Seq;
+import scala.collection.immutable.List;
+/**
+ * Translator for {@link GroupByKey} using {@link Dataset#groupByKey} with the
build-in aggregation
+ * function {@code collect_list} when applicable.
+ *
+ * <p>Note: Using {@code collect_list} isn't any worse than using {@link
ReduceFnRunner}. In the
+ * latter case the entire group (iterator) has to be loaded into memory as
well. Either way there's
+ * a risk of OOM errors. When disabling {@link #useCollectList}, a more memory
sensitive iterable is
+ * used that can be traversed just once. Attempting to traverse the iterable
again will throw.
+ *
+ * <ul>
+ * <li>When using the default global window, window information is dropped
and restored after the
+ * aggregation.
+ * <li>For non-merging windows, windows are exploded and moved into a
composite key for better
+ * distribution. Though, to keep the amount of shuffled data low, this
is only done if values
+ * are assigned to a single window or if there are only few keys and
distributing data is
+ * important. After the aggregation, windowed values are restored from
the composite key.
+ * <li>All other cases are implemented using the SDK {@link ReduceFnRunner}.
+ * </ul>
+ */
class GroupByKeyTranslatorBatch<K, V>
- implements TransformTranslator<
- PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>> {
+ extends GroupingTranslator<K, V, Iterable<V>, GroupByKey<K, V>> {
+
+ /** Literal of binary encoded Pane info. */
+ private static final Expression PANE_NO_FIRING = lit(toByteArray(NO_FIRING,
PaneInfoCoder.of()));
+
+ /** Defaults for value in single global window. */
+ private static final List<Expression> GLOBAL_WINDOW_DETAILS =
+ windowDetails(lit(new byte[][] {EMPTY_BYTE_ARRAY}));
+
+ private boolean useCollectList = true;
+
+ public GroupByKeyTranslatorBatch() {}
+
+ public GroupByKeyTranslatorBatch(boolean useCollectList) {
+ this.useCollectList = useCollectList;
+ }
@Override
- public void translateTransform(
- PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>
transform,
- AbstractTranslationContext context) {
-
- @SuppressWarnings("unchecked")
- final PCollection<KV<K, V>> inputPCollection = (PCollection<KV<K, V>>)
context.getInput();
- Dataset<WindowedValue<KV<K, V>>> input =
context.getDataset(inputPCollection);
- WindowingStrategy<?, ?> windowingStrategy =
inputPCollection.getWindowingStrategy();
- KvCoder<K, V> kvCoder = (KvCoder<K, V>) inputPCollection.getCoder();
- Coder<V> valueCoder = kvCoder.getValueCoder();
-
- // group by key only
- Coder<K> keyCoder = kvCoder.getKeyCoder();
- KeyValueGroupedDataset<K, WindowedValue<KV<K, V>>> groupByKeyOnly =
- input.groupByKey(KVHelpers.extractKey(),
EncoderHelpers.fromBeamCoder(keyCoder));
-
- // group also by windows
- WindowedValue.FullWindowedValueCoder<KV<K, Iterable<V>>> outputCoder =
- WindowedValue.FullWindowedValueCoder.of(
- KvCoder.of(keyCoder, IterableCoder.of(valueCoder)),
- windowingStrategy.getWindowFn().windowCoder());
- Dataset<WindowedValue<KV<K, Iterable<V>>>> output =
- groupByKeyOnly.flatMapGroups(
- new GroupAlsoByWindowViaOutputBufferFn<>(
- windowingStrategy,
- new InMemoryStateInternalsFactory<>(),
- SystemReduceFn.buffering(valueCoder),
- context.getSerializableOptions()),
- EncoderHelpers.fromBeamCoder(outputCoder));
-
- context.putDataset(context.getOutput(), output);
+ public void translate(GroupByKey<K, V> transform, Context cxt) {
+ WindowingStrategy<?, ?> windowing = cxt.getInput().getWindowingStrategy();
+ TimestampCombiner tsCombiner = windowing.getTimestampCombiner();
+
+ Dataset<WindowedValue<KV<K, V>>> input = cxt.getDataset(cxt.getInput());
+
+ KvCoder<K, V> inputCoder = (KvCoder<K, V>) cxt.getInput().getCoder();
+ KvCoder<K, Iterable<V>> outputCoder = (KvCoder<K, Iterable<V>>)
cxt.getOutput().getCoder();
+
+ Encoder<V> valueEnc = cxt.valueEncoderOf(inputCoder);
+ Encoder<K> keyEnc = cxt.keyEncoderOf(inputCoder);
+
+ // In batch we can ignore triggering and allowed lateness parameters
+ final Dataset<WindowedValue<KV<K, Iterable<V>>>> result;
+
+ if (useCollectList && eligibleForGlobalGroupBy(windowing, false)) {
+ // Collects all values per key in memory. This might be problematic if
there's few keys only
+ // or some highly skewed distribution.
+ result =
+ input
+ .groupBy(col("value.key").as("key"))
+ .agg(collect_list(col("value.value")).as("values"),
timestampAggregator(tsCombiner))
+ .select(
+ inGlobalWindow(
+ keyValue(col("key").as(keyEnc),
col("values").as(iterableEnc(valueEnc))),
+ windowTimestamp(tsCombiner)));
+
+ } else if (eligibleForGlobalGroupBy(windowing, true)) {
Review Comment:
can you add a javadoc on the endOfWindowOnly boolean ?
##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java:
##########
@@ -17,74 +17,264 @@
*/
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+import static
org.apache.beam.repackaged.core.org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.CoderHelpers.toByteArray;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.collectionEncoder;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.encoderOf;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.kvEncoder;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.windowedValueEncoder;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.concat;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.fun1;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.fun2;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.javaIterator;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.listOf;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.seqOf;
+import static org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING;
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.collect_list;
+import static org.apache.spark.sql.functions.explode;
+import static org.apache.spark.sql.functions.max;
+import static org.apache.spark.sql.functions.min;
+import static org.apache.spark.sql.functions.struct;
+
import java.io.Serializable;
import org.apache.beam.runners.core.InMemoryStateInternals;
-import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.ReduceFnRunner;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.SystemReduceFn;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.AbstractTranslationContext;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
import
org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.GroupAlsoByWindowViaOutputBufferFn;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.KeyValueGroupedDataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.TypedColumn;
+import org.apache.spark.sql.catalyst.expressions.CreateArray;
+import org.apache.spark.sql.catalyst.expressions.CreateNamedStruct;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.catalyst.expressions.Literal;
+import org.apache.spark.sql.catalyst.expressions.Literal$;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import scala.Tuple2;
+import scala.collection.Iterator;
+import scala.collection.Seq;
+import scala.collection.immutable.List;
+/**
+ * Translator for {@link GroupByKey} using {@link Dataset#groupByKey} with the
build-in aggregation
+ * function {@code collect_list} when applicable.
+ *
+ * <p>Note: Using {@code collect_list} isn't any worse than using {@link
ReduceFnRunner}. In the
+ * latter case the entire group (iterator) has to be loaded into memory as
well. Either way there's
+ * a risk of OOM errors. When disabling {@link #useCollectList}, a more memory
sensitive iterable is
+ * used that can be traversed just once. Attempting to traverse the iterable
again will throw.
+ *
+ * <ul>
+ * <li>When using the default global window, window information is dropped
and restored after the
+ * aggregation.
+ * <li>For non-merging windows, windows are exploded and moved into a
composite key for better
+ * distribution. Though, to keep the amount of shuffled data low, this
is only done if values
+ * are assigned to a single window or if there are only few keys and
distributing data is
+ * important. After the aggregation, windowed values are restored from
the composite key.
+ * <li>All other cases are implemented using the SDK {@link ReduceFnRunner}.
+ * </ul>
+ */
class GroupByKeyTranslatorBatch<K, V>
- implements TransformTranslator<
- PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>> {
+ extends GroupingTranslator<K, V, Iterable<V>, GroupByKey<K, V>> {
+
+ /** Literal of binary encoded Pane info. */
+ private static final Expression PANE_NO_FIRING = lit(toByteArray(NO_FIRING,
PaneInfoCoder.of()));
+
+ /** Defaults for value in single global window. */
+ private static final List<Expression> GLOBAL_WINDOW_DETAILS =
+ windowDetails(lit(new byte[][] {EMPTY_BYTE_ARRAY}));
+
+ private boolean useCollectList = true;
+
+ public GroupByKeyTranslatorBatch() {}
+
+ public GroupByKeyTranslatorBatch(boolean useCollectList) {
+ this.useCollectList = useCollectList;
+ }
@Override
- public void translateTransform(
- PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>
transform,
- AbstractTranslationContext context) {
-
- @SuppressWarnings("unchecked")
- final PCollection<KV<K, V>> inputPCollection = (PCollection<KV<K, V>>)
context.getInput();
- Dataset<WindowedValue<KV<K, V>>> input =
context.getDataset(inputPCollection);
- WindowingStrategy<?, ?> windowingStrategy =
inputPCollection.getWindowingStrategy();
- KvCoder<K, V> kvCoder = (KvCoder<K, V>) inputPCollection.getCoder();
- Coder<V> valueCoder = kvCoder.getValueCoder();
-
- // group by key only
- Coder<K> keyCoder = kvCoder.getKeyCoder();
- KeyValueGroupedDataset<K, WindowedValue<KV<K, V>>> groupByKeyOnly =
- input.groupByKey(KVHelpers.extractKey(),
EncoderHelpers.fromBeamCoder(keyCoder));
-
- // group also by windows
- WindowedValue.FullWindowedValueCoder<KV<K, Iterable<V>>> outputCoder =
- WindowedValue.FullWindowedValueCoder.of(
- KvCoder.of(keyCoder, IterableCoder.of(valueCoder)),
- windowingStrategy.getWindowFn().windowCoder());
- Dataset<WindowedValue<KV<K, Iterable<V>>>> output =
- groupByKeyOnly.flatMapGroups(
- new GroupAlsoByWindowViaOutputBufferFn<>(
- windowingStrategy,
- new InMemoryStateInternalsFactory<>(),
- SystemReduceFn.buffering(valueCoder),
- context.getSerializableOptions()),
- EncoderHelpers.fromBeamCoder(outputCoder));
-
- context.putDataset(context.getOutput(), output);
+ public void translate(GroupByKey<K, V> transform, Context cxt) {
+ WindowingStrategy<?, ?> windowing = cxt.getInput().getWindowingStrategy();
+ TimestampCombiner tsCombiner = windowing.getTimestampCombiner();
+
+ Dataset<WindowedValue<KV<K, V>>> input = cxt.getDataset(cxt.getInput());
+
+ KvCoder<K, V> inputCoder = (KvCoder<K, V>) cxt.getInput().getCoder();
+ KvCoder<K, Iterable<V>> outputCoder = (KvCoder<K, Iterable<V>>)
cxt.getOutput().getCoder();
+
+ Encoder<V> valueEnc = cxt.valueEncoderOf(inputCoder);
+ Encoder<K> keyEnc = cxt.keyEncoderOf(inputCoder);
+
+ // In batch we can ignore triggering and allowed lateness parameters
+ final Dataset<WindowedValue<KV<K, Iterable<V>>>> result;
+
+ if (useCollectList && eligibleForGlobalGroupBy(windowing, false)) {
+ // Collects all values per key in memory. This might be problematic if
there's few keys only
+ // or some highly skewed distribution.
+ result =
+ input
+ .groupBy(col("value.key").as("key"))
+ .agg(collect_list(col("value.value")).as("values"),
timestampAggregator(tsCombiner))
Review Comment:
clever !
##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupingTranslator.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.tuple;
+import static org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING;
+import static
org.apache.beam.sdk.transforms.windowing.TimestampCombiner.END_OF_WINDOW;
+
+import java.util.Collection;
+import
org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
+import
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop;
+import
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.Fun1;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.spark.sql.Encoder;
+import scala.Tuple2;
+import scala.collection.TraversableOnce;
+
+abstract class GroupingTranslator<
Review Comment:
nit: consider adding helper to the name because they are only utility
methods. That would make code read quicker.
##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java:
##########
@@ -17,74 +17,264 @@
*/
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+import static
org.apache.beam.repackaged.core.org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.CoderHelpers.toByteArray;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.collectionEncoder;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.encoderOf;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.kvEncoder;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.windowedValueEncoder;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.concat;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.fun1;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.fun2;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.javaIterator;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.listOf;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.seqOf;
+import static org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING;
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.collect_list;
+import static org.apache.spark.sql.functions.explode;
+import static org.apache.spark.sql.functions.max;
+import static org.apache.spark.sql.functions.min;
+import static org.apache.spark.sql.functions.struct;
+
import java.io.Serializable;
import org.apache.beam.runners.core.InMemoryStateInternals;
-import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.ReduceFnRunner;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.SystemReduceFn;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.AbstractTranslationContext;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
import
org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.GroupAlsoByWindowViaOutputBufferFn;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.KeyValueGroupedDataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.TypedColumn;
+import org.apache.spark.sql.catalyst.expressions.CreateArray;
+import org.apache.spark.sql.catalyst.expressions.CreateNamedStruct;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.catalyst.expressions.Literal;
+import org.apache.spark.sql.catalyst.expressions.Literal$;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import scala.Tuple2;
+import scala.collection.Iterator;
+import scala.collection.Seq;
+import scala.collection.immutable.List;
+/**
+ * Translator for {@link GroupByKey} using {@link Dataset#groupByKey} with the
build-in aggregation
+ * function {@code collect_list} when applicable.
+ *
+ * <p>Note: Using {@code collect_list} isn't any worse than using {@link
ReduceFnRunner}. In the
+ * latter case the entire group (iterator) has to be loaded into memory as
well. Either way there's
+ * a risk of OOM errors. When disabling {@link #useCollectList}, a more memory
sensitive iterable is
+ * used that can be traversed just once. Attempting to traverse the iterable
again will throw.
+ *
+ * <ul>
+ * <li>When using the default global window, window information is dropped
and restored after the
+ * aggregation.
+ * <li>For non-merging windows, windows are exploded and moved into a
composite key for better
+ * distribution. Though, to keep the amount of shuffled data low, this
is only done if values
+ * are assigned to a single window or if there are only few keys and
distributing data is
+ * important. After the aggregation, windowed values are restored from
the composite key.
+ * <li>All other cases are implemented using the SDK {@link ReduceFnRunner}.
+ * </ul>
+ */
class GroupByKeyTranslatorBatch<K, V>
- implements TransformTranslator<
- PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>> {
+ extends GroupingTranslator<K, V, Iterable<V>, GroupByKey<K, V>> {
+
+ /** Literal of binary encoded Pane info. */
+ private static final Expression PANE_NO_FIRING = lit(toByteArray(NO_FIRING,
PaneInfoCoder.of()));
+
+ /** Defaults for value in single global window. */
+ private static final List<Expression> GLOBAL_WINDOW_DETAILS =
+ windowDetails(lit(new byte[][] {EMPTY_BYTE_ARRAY}));
+
+ private boolean useCollectList = true;
+
+ public GroupByKeyTranslatorBatch() {}
+
+ public GroupByKeyTranslatorBatch(boolean useCollectList) {
+ this.useCollectList = useCollectList;
+ }
@Override
- public void translateTransform(
- PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>
transform,
- AbstractTranslationContext context) {
-
- @SuppressWarnings("unchecked")
- final PCollection<KV<K, V>> inputPCollection = (PCollection<KV<K, V>>)
context.getInput();
- Dataset<WindowedValue<KV<K, V>>> input =
context.getDataset(inputPCollection);
- WindowingStrategy<?, ?> windowingStrategy =
inputPCollection.getWindowingStrategy();
- KvCoder<K, V> kvCoder = (KvCoder<K, V>) inputPCollection.getCoder();
- Coder<V> valueCoder = kvCoder.getValueCoder();
-
- // group by key only
- Coder<K> keyCoder = kvCoder.getKeyCoder();
- KeyValueGroupedDataset<K, WindowedValue<KV<K, V>>> groupByKeyOnly =
- input.groupByKey(KVHelpers.extractKey(),
EncoderHelpers.fromBeamCoder(keyCoder));
-
- // group also by windows
- WindowedValue.FullWindowedValueCoder<KV<K, Iterable<V>>> outputCoder =
- WindowedValue.FullWindowedValueCoder.of(
- KvCoder.of(keyCoder, IterableCoder.of(valueCoder)),
- windowingStrategy.getWindowFn().windowCoder());
- Dataset<WindowedValue<KV<K, Iterable<V>>>> output =
- groupByKeyOnly.flatMapGroups(
- new GroupAlsoByWindowViaOutputBufferFn<>(
- windowingStrategy,
- new InMemoryStateInternalsFactory<>(),
- SystemReduceFn.buffering(valueCoder),
- context.getSerializableOptions()),
- EncoderHelpers.fromBeamCoder(outputCoder));
-
- context.putDataset(context.getOutput(), output);
+ public void translate(GroupByKey<K, V> transform, Context cxt) {
+ WindowingStrategy<?, ?> windowing = cxt.getInput().getWindowingStrategy();
+ TimestampCombiner tsCombiner = windowing.getTimestampCombiner();
+
+ Dataset<WindowedValue<KV<K, V>>> input = cxt.getDataset(cxt.getInput());
+
+ KvCoder<K, V> inputCoder = (KvCoder<K, V>) cxt.getInput().getCoder();
+ KvCoder<K, Iterable<V>> outputCoder = (KvCoder<K, Iterable<V>>)
cxt.getOutput().getCoder();
+
+ Encoder<V> valueEnc = cxt.valueEncoderOf(inputCoder);
+ Encoder<K> keyEnc = cxt.keyEncoderOf(inputCoder);
+
+ // In batch we can ignore triggering and allowed lateness parameters
+ final Dataset<WindowedValue<KV<K, Iterable<V>>>> result;
+
+ if (useCollectList && eligibleForGlobalGroupBy(windowing, false)) {
+ // Collects all values per key in memory. This might be problematic if
there's few keys only
+ // or some highly skewed distribution.
+ result =
+ input
+ .groupBy(col("value.key").as("key"))
+ .agg(collect_list(col("value.value")).as("values"),
timestampAggregator(tsCombiner))
+ .select(
+ inGlobalWindow(
+ keyValue(col("key").as(keyEnc),
col("values").as(iterableEnc(valueEnc))),
+ windowTimestamp(tsCombiner)));
+
+ } else if (eligibleForGlobalGroupBy(windowing, true)) {
+ // Produces an iterable that can be traversed exactly once. However, on
the plus side, data is
+ // not collected in memory until serialized or done by the user.
+ result =
+ cxt.getDataset(cxt.getInput())
+ .groupByKey(valueKey(), keyEnc)
+ .mapValues(valueValue(), cxt.valueEncoderOf(inputCoder))
+ .mapGroups(fun2((k, it) -> KV.of(k, iterableOnce(it))),
cxt.kvEncoderOf(outputCoder))
+ .map(fun1(WindowedValue::valueInGlobalWindow),
cxt.windowedEncoder(outputCoder));
+
+ } else if (useCollectList
+ && eligibleForGroupByWindow(windowing, false)
+ && (windowing.getWindowFn().assignsToOneWindow() ||
transform.fewKeys())) {
+ // Using the window as part of the key should help to better distribute
the data. However, if
+ // values are assigned to multiple windows, more data would be shuffled
around. If there's few
+ // keys only, this is still valuable.
+ // Collects all values per key & window in memory.
+ result =
+ input
+ .select(explode(col("windows")).as("window"), col("value"),
col("timestamp"))
+ .groupBy(col("value.key"), col("window"))
+ .agg(collect_list(col("value.value")).as("values"),
timestampAggregator(tsCombiner))
+ .select(
+ inSingleWindow(
+ keyValue(col("key").as(keyEnc),
col("values").as(iterableEnc(valueEnc))),
+ col("window").as(cxt.windowEncoder()),
+ windowTimestamp(tsCombiner)));
+
+ } else if (eligibleForGroupByWindow(windowing, true)
+ && (windowing.getWindowFn().assignsToOneWindow() ||
transform.fewKeys())) {
+ // Using the window as part of the key should help to better distribute
the data. However, if
+ // values are assigned to multiple windows, more data would be shuffled
around. If there's few
+ // keys only, this is still valuable.
+ // Produces an iterable that can be traversed exactly once. However, on
the plus side, data is
+ // not collected in memory until serialized or done by the user.
+ Encoder<Tuple2<BoundedWindow, K>> windowedKeyEnc =
windowedKeyEnc(keyEnc, cxt);
+ result =
+ cxt.getDataset(cxt.getInput())
+ .flatMap(explodeWindowedKey(valueValue()),
cxt.tupleEncoder(windowedKeyEnc, valueEnc))
+ .groupByKey(fun1(Tuple2::_1), windowedKeyEnc)
+ .mapValues(fun1(Tuple2::_2), valueEnc)
+ .mapGroups(
+ fun2((wKey, it) -> windowedKV(wKey, iterableOnce(it))),
+ cxt.windowedEncoder(outputCoder));
+
+ } else {
+ // Collects all values per key in memory. This might be problematic if
there's few keys only
+ // or some highly skewed distribution.
+
+ // FIXME Revisit this case, implementation is far from ideal:
+ // - iterator traversed at least twice, forcing materialization in memory
Review Comment:
yes that was the pb I had that forced materialization. The only interest I
see is that ReducFnRunner at least does all the window merging on complex
windowing for you
##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java:
##########
@@ -17,74 +17,264 @@
*/
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+import static
org.apache.beam.repackaged.core.org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.CoderHelpers.toByteArray;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.collectionEncoder;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.encoderOf;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.kvEncoder;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.windowedValueEncoder;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.concat;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.fun1;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.fun2;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.javaIterator;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.listOf;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.seqOf;
+import static org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING;
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.collect_list;
+import static org.apache.spark.sql.functions.explode;
+import static org.apache.spark.sql.functions.max;
+import static org.apache.spark.sql.functions.min;
+import static org.apache.spark.sql.functions.struct;
+
import java.io.Serializable;
import org.apache.beam.runners.core.InMemoryStateInternals;
-import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.ReduceFnRunner;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.SystemReduceFn;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.AbstractTranslationContext;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
import
org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.GroupAlsoByWindowViaOutputBufferFn;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.KeyValueGroupedDataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.TypedColumn;
+import org.apache.spark.sql.catalyst.expressions.CreateArray;
+import org.apache.spark.sql.catalyst.expressions.CreateNamedStruct;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.catalyst.expressions.Literal;
+import org.apache.spark.sql.catalyst.expressions.Literal$;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import scala.Tuple2;
+import scala.collection.Iterator;
+import scala.collection.Seq;
+import scala.collection.immutable.List;
+/**
+ * Translator for {@link GroupByKey} using {@link Dataset#groupByKey} with the
build-in aggregation
+ * function {@code collect_list} when applicable.
+ *
+ * <p>Note: Using {@code collect_list} isn't any worse than using {@link
ReduceFnRunner}. In the
+ * latter case the entire group (iterator) has to be loaded into memory as
well. Either way there's
+ * a risk of OOM errors. When disabling {@link #useCollectList}, a more memory
sensitive iterable is
Review Comment:
new phrasing is clearer
##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java:
##########
@@ -17,74 +17,264 @@
*/
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+import static
org.apache.beam.repackaged.core.org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.CoderHelpers.toByteArray;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.collectionEncoder;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.encoderOf;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.kvEncoder;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.windowedValueEncoder;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.concat;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.fun1;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.fun2;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.javaIterator;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.listOf;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.seqOf;
+import static org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING;
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.collect_list;
+import static org.apache.spark.sql.functions.explode;
+import static org.apache.spark.sql.functions.max;
+import static org.apache.spark.sql.functions.min;
+import static org.apache.spark.sql.functions.struct;
+
import java.io.Serializable;
import org.apache.beam.runners.core.InMemoryStateInternals;
-import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.ReduceFnRunner;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.SystemReduceFn;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.AbstractTranslationContext;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
import
org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.GroupAlsoByWindowViaOutputBufferFn;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.KeyValueGroupedDataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.TypedColumn;
+import org.apache.spark.sql.catalyst.expressions.CreateArray;
+import org.apache.spark.sql.catalyst.expressions.CreateNamedStruct;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.catalyst.expressions.Literal;
+import org.apache.spark.sql.catalyst.expressions.Literal$;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import scala.Tuple2;
+import scala.collection.Iterator;
+import scala.collection.Seq;
+import scala.collection.immutable.List;
+/**
+ * Translator for {@link GroupByKey} using {@link Dataset#groupByKey} with the
build-in aggregation
+ * function {@code collect_list} when applicable.
+ *
+ * <p>Note: Using {@code collect_list} isn't any worse than using {@link
ReduceFnRunner}. In the
+ * latter case the entire group (iterator) has to be loaded into memory as
well. Either way there's
+ * a risk of OOM errors. When disabling {@link #useCollectList}, a more memory
sensitive iterable is
+ * used that can be traversed just once. Attempting to traverse the iterable
again will throw.
+ *
+ * <ul>
+ * <li>When using the default global window, window information is dropped
and restored after the
+ * aggregation.
+ * <li>For non-merging windows, windows are exploded and moved into a
composite key for better
+ * distribution. Though, to keep the amount of shuffled data low, this
is only done if values
+ * are assigned to a single window or if there are only few keys and
distributing data is
+ * important. After the aggregation, windowed values are restored from
the composite key.
+ * <li>All other cases are implemented using the SDK {@link ReduceFnRunner}.
+ * </ul>
+ */
class GroupByKeyTranslatorBatch<K, V>
- implements TransformTranslator<
- PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>> {
+ extends GroupingTranslator<K, V, Iterable<V>, GroupByKey<K, V>> {
+
+ /** Literal of binary encoded Pane info. */
+ private static final Expression PANE_NO_FIRING = lit(toByteArray(NO_FIRING,
PaneInfoCoder.of()));
+
+ /** Defaults for value in single global window. */
+ private static final List<Expression> GLOBAL_WINDOW_DETAILS =
+ windowDetails(lit(new byte[][] {EMPTY_BYTE_ARRAY}));
+
+ private boolean useCollectList = true;
+
+ public GroupByKeyTranslatorBatch() {}
+
+ public GroupByKeyTranslatorBatch(boolean useCollectList) {
+ this.useCollectList = useCollectList;
+ }
@Override
- public void translateTransform(
- PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>
transform,
- AbstractTranslationContext context) {
-
- @SuppressWarnings("unchecked")
- final PCollection<KV<K, V>> inputPCollection = (PCollection<KV<K, V>>)
context.getInput();
- Dataset<WindowedValue<KV<K, V>>> input =
context.getDataset(inputPCollection);
- WindowingStrategy<?, ?> windowingStrategy =
inputPCollection.getWindowingStrategy();
- KvCoder<K, V> kvCoder = (KvCoder<K, V>) inputPCollection.getCoder();
- Coder<V> valueCoder = kvCoder.getValueCoder();
-
- // group by key only
- Coder<K> keyCoder = kvCoder.getKeyCoder();
- KeyValueGroupedDataset<K, WindowedValue<KV<K, V>>> groupByKeyOnly =
- input.groupByKey(KVHelpers.extractKey(),
EncoderHelpers.fromBeamCoder(keyCoder));
-
- // group also by windows
- WindowedValue.FullWindowedValueCoder<KV<K, Iterable<V>>> outputCoder =
- WindowedValue.FullWindowedValueCoder.of(
- KvCoder.of(keyCoder, IterableCoder.of(valueCoder)),
- windowingStrategy.getWindowFn().windowCoder());
- Dataset<WindowedValue<KV<K, Iterable<V>>>> output =
- groupByKeyOnly.flatMapGroups(
- new GroupAlsoByWindowViaOutputBufferFn<>(
- windowingStrategy,
- new InMemoryStateInternalsFactory<>(),
- SystemReduceFn.buffering(valueCoder),
- context.getSerializableOptions()),
- EncoderHelpers.fromBeamCoder(outputCoder));
-
- context.putDataset(context.getOutput(), output);
+ public void translate(GroupByKey<K, V> transform, Context cxt) {
+ WindowingStrategy<?, ?> windowing = cxt.getInput().getWindowingStrategy();
+ TimestampCombiner tsCombiner = windowing.getTimestampCombiner();
+
+ Dataset<WindowedValue<KV<K, V>>> input = cxt.getDataset(cxt.getInput());
+
+ KvCoder<K, V> inputCoder = (KvCoder<K, V>) cxt.getInput().getCoder();
+ KvCoder<K, Iterable<V>> outputCoder = (KvCoder<K, Iterable<V>>)
cxt.getOutput().getCoder();
+
+ Encoder<V> valueEnc = cxt.valueEncoderOf(inputCoder);
+ Encoder<K> keyEnc = cxt.keyEncoderOf(inputCoder);
+
+ // In batch we can ignore triggering and allowed lateness parameters
+ final Dataset<WindowedValue<KV<K, Iterable<V>>>> result;
+
+ if (useCollectList && eligibleForGlobalGroupBy(windowing, false)) {
+ // Collects all values per key in memory. This might be problematic if
there's few keys only
+ // or some highly skewed distribution.
+ result =
+ input
+ .groupBy(col("value.key").as("key"))
+ .agg(collect_list(col("value.value")).as("values"),
timestampAggregator(tsCombiner))
Review Comment:
good to do this low level instead of using Beam sdk
##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java:
##########
@@ -17,74 +17,264 @@
*/
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+import static
org.apache.beam.repackaged.core.org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.CoderHelpers.toByteArray;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.collectionEncoder;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.encoderOf;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.kvEncoder;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.windowedValueEncoder;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.concat;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.fun1;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.fun2;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.javaIterator;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.listOf;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.seqOf;
+import static org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING;
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.collect_list;
+import static org.apache.spark.sql.functions.explode;
+import static org.apache.spark.sql.functions.max;
+import static org.apache.spark.sql.functions.min;
+import static org.apache.spark.sql.functions.struct;
+
import java.io.Serializable;
import org.apache.beam.runners.core.InMemoryStateInternals;
-import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.ReduceFnRunner;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.SystemReduceFn;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.AbstractTranslationContext;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
import
org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.GroupAlsoByWindowViaOutputBufferFn;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.KeyValueGroupedDataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.TypedColumn;
+import org.apache.spark.sql.catalyst.expressions.CreateArray;
+import org.apache.spark.sql.catalyst.expressions.CreateNamedStruct;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.catalyst.expressions.Literal;
+import org.apache.spark.sql.catalyst.expressions.Literal$;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import scala.Tuple2;
+import scala.collection.Iterator;
+import scala.collection.Seq;
+import scala.collection.immutable.List;
+/**
+ * Translator for {@link GroupByKey} using {@link Dataset#groupByKey} with the
build-in aggregation
+ * function {@code collect_list} when applicable.
+ *
+ * <p>Note: Using {@code collect_list} isn't any worse than using {@link
ReduceFnRunner}. In the
+ * latter case the entire group (iterator) has to be loaded into memory as
well. Either way there's
+ * a risk of OOM errors. When disabling {@link #useCollectList}, a more memory
sensitive iterable is
+ * used that can be traversed just once. Attempting to traverse the iterable
again will throw.
+ *
+ * <ul>
+ * <li>When using the default global window, window information is dropped
and restored after the
+ * aggregation.
+ * <li>For non-merging windows, windows are exploded and moved into a
composite key for better
+ * distribution. Though, to keep the amount of shuffled data low, this
is only done if values
+ * are assigned to a single window or if there are only few keys and
distributing data is
+ * important. After the aggregation, windowed values are restored from
the composite key.
+ * <li>All other cases are implemented using the SDK {@link ReduceFnRunner}.
+ * </ul>
+ */
class GroupByKeyTranslatorBatch<K, V>
- implements TransformTranslator<
- PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>> {
+ extends GroupingTranslator<K, V, Iterable<V>, GroupByKey<K, V>> {
+
+ /** Literal of binary encoded Pane info. */
+ private static final Expression PANE_NO_FIRING = lit(toByteArray(NO_FIRING,
PaneInfoCoder.of()));
+
+ /** Defaults for value in single global window. */
+ private static final List<Expression> GLOBAL_WINDOW_DETAILS =
+ windowDetails(lit(new byte[][] {EMPTY_BYTE_ARRAY}));
+
+ private boolean useCollectList = true;
+
+ public GroupByKeyTranslatorBatch() {}
+
+ public GroupByKeyTranslatorBatch(boolean useCollectList) {
+ this.useCollectList = useCollectList;
+ }
@Override
- public void translateTransform(
- PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>
transform,
- AbstractTranslationContext context) {
-
- @SuppressWarnings("unchecked")
- final PCollection<KV<K, V>> inputPCollection = (PCollection<KV<K, V>>)
context.getInput();
- Dataset<WindowedValue<KV<K, V>>> input =
context.getDataset(inputPCollection);
- WindowingStrategy<?, ?> windowingStrategy =
inputPCollection.getWindowingStrategy();
- KvCoder<K, V> kvCoder = (KvCoder<K, V>) inputPCollection.getCoder();
- Coder<V> valueCoder = kvCoder.getValueCoder();
-
- // group by key only
- Coder<K> keyCoder = kvCoder.getKeyCoder();
- KeyValueGroupedDataset<K, WindowedValue<KV<K, V>>> groupByKeyOnly =
- input.groupByKey(KVHelpers.extractKey(),
EncoderHelpers.fromBeamCoder(keyCoder));
-
- // group also by windows
- WindowedValue.FullWindowedValueCoder<KV<K, Iterable<V>>> outputCoder =
- WindowedValue.FullWindowedValueCoder.of(
- KvCoder.of(keyCoder, IterableCoder.of(valueCoder)),
- windowingStrategy.getWindowFn().windowCoder());
- Dataset<WindowedValue<KV<K, Iterable<V>>>> output =
- groupByKeyOnly.flatMapGroups(
- new GroupAlsoByWindowViaOutputBufferFn<>(
- windowingStrategy,
- new InMemoryStateInternalsFactory<>(),
- SystemReduceFn.buffering(valueCoder),
- context.getSerializableOptions()),
- EncoderHelpers.fromBeamCoder(outputCoder));
-
- context.putDataset(context.getOutput(), output);
+ public void translate(GroupByKey<K, V> transform, Context cxt) {
+ WindowingStrategy<?, ?> windowing = cxt.getInput().getWindowingStrategy();
+ TimestampCombiner tsCombiner = windowing.getTimestampCombiner();
+
+ Dataset<WindowedValue<KV<K, V>>> input = cxt.getDataset(cxt.getInput());
+
+ KvCoder<K, V> inputCoder = (KvCoder<K, V>) cxt.getInput().getCoder();
+ KvCoder<K, Iterable<V>> outputCoder = (KvCoder<K, Iterable<V>>)
cxt.getOutput().getCoder();
+
+ Encoder<V> valueEnc = cxt.valueEncoderOf(inputCoder);
+ Encoder<K> keyEnc = cxt.keyEncoderOf(inputCoder);
+
+ // In batch we can ignore triggering and allowed lateness parameters
+ final Dataset<WindowedValue<KV<K, Iterable<V>>>> result;
+
+ if (useCollectList && eligibleForGlobalGroupBy(windowing, false)) {
+ // Collects all values per key in memory. This might be problematic if
there's few keys only
+ // or some highly skewed distribution.
+ result =
+ input
+ .groupBy(col("value.key").as("key"))
+ .agg(collect_list(col("value.value")).as("values"),
timestampAggregator(tsCombiner))
+ .select(
+ inGlobalWindow(
+ keyValue(col("key").as(keyEnc),
col("values").as(iterableEnc(valueEnc))),
+ windowTimestamp(tsCombiner)));
+
+ } else if (eligibleForGlobalGroupBy(windowing, true)) {
+ // Produces an iterable that can be traversed exactly once. However, on
the plus side, data is
+ // not collected in memory until serialized or done by the user.
+ result =
+ cxt.getDataset(cxt.getInput())
+ .groupByKey(valueKey(), keyEnc)
+ .mapValues(valueValue(), cxt.valueEncoderOf(inputCoder))
+ .mapGroups(fun2((k, it) -> KV.of(k, iterableOnce(it))),
cxt.kvEncoderOf(outputCoder))
+ .map(fun1(WindowedValue::valueInGlobalWindow),
cxt.windowedEncoder(outputCoder));
+
+ } else if (useCollectList
+ && eligibleForGroupByWindow(windowing, false)
+ && (windowing.getWindowFn().assignsToOneWindow() ||
transform.fewKeys())) {
Review Comment:
clever perf improvement
##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java:
##########
@@ -17,74 +17,264 @@
*/
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+import static
org.apache.beam.repackaged.core.org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.CoderHelpers.toByteArray;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.collectionEncoder;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.encoderOf;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.kvEncoder;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.windowedValueEncoder;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.concat;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.fun1;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.fun2;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.javaIterator;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.listOf;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.seqOf;
+import static org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING;
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.collect_list;
+import static org.apache.spark.sql.functions.explode;
+import static org.apache.spark.sql.functions.max;
+import static org.apache.spark.sql.functions.min;
+import static org.apache.spark.sql.functions.struct;
+
import java.io.Serializable;
import org.apache.beam.runners.core.InMemoryStateInternals;
-import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.ReduceFnRunner;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.SystemReduceFn;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.AbstractTranslationContext;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
import
org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.GroupAlsoByWindowViaOutputBufferFn;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.KeyValueGroupedDataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.TypedColumn;
+import org.apache.spark.sql.catalyst.expressions.CreateArray;
+import org.apache.spark.sql.catalyst.expressions.CreateNamedStruct;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.catalyst.expressions.Literal;
+import org.apache.spark.sql.catalyst.expressions.Literal$;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import scala.Tuple2;
+import scala.collection.Iterator;
+import scala.collection.Seq;
+import scala.collection.immutable.List;
+/**
+ * Translator for {@link GroupByKey} using {@link Dataset#groupByKey} with the
build-in aggregation
+ * function {@code collect_list} when applicable.
Review Comment:
ouch I was hopping on collect_list. But at least you managed to avoid OOM in
some cases compared to previous impl
##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java:
##########
@@ -17,74 +17,264 @@
*/
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+import static
org.apache.beam.repackaged.core.org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.CoderHelpers.toByteArray;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.collectionEncoder;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.encoderOf;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.kvEncoder;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers.windowedValueEncoder;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.concat;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.fun1;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.fun2;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.javaIterator;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.listOf;
+import static
org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.seqOf;
+import static org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING;
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.collect_list;
+import static org.apache.spark.sql.functions.explode;
+import static org.apache.spark.sql.functions.max;
+import static org.apache.spark.sql.functions.min;
+import static org.apache.spark.sql.functions.struct;
+
import java.io.Serializable;
import org.apache.beam.runners.core.InMemoryStateInternals;
-import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.ReduceFnRunner;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.SystemReduceFn;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.AbstractTranslationContext;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
import
org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.GroupAlsoByWindowViaOutputBufferFn;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
-import
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.KeyValueGroupedDataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.TypedColumn;
+import org.apache.spark.sql.catalyst.expressions.CreateArray;
+import org.apache.spark.sql.catalyst.expressions.CreateNamedStruct;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.catalyst.expressions.Literal;
+import org.apache.spark.sql.catalyst.expressions.Literal$;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import scala.Tuple2;
+import scala.collection.Iterator;
+import scala.collection.Seq;
+import scala.collection.immutable.List;
+/**
+ * Translator for {@link GroupByKey} using {@link Dataset#groupByKey} with the
build-in aggregation
+ * function {@code collect_list} when applicable.
+ *
+ * <p>Note: Using {@code collect_list} isn't any worse than using {@link
ReduceFnRunner}. In the
+ * latter case the entire group (iterator) has to be loaded into memory as
well, risking OOM errors
+ * in both cases. When disabling {@link #useCollectList}, a more memory
sensitive iterable is used
+ * that can be traversed just once. Attempting to traverse the iterable again
will throw.
+ *
+ * <ul>
+ * <li>When using the default global window, window information is dropped
and restored after the
+ * aggregation.
+ * <li>For non-merging windows, windows are exploded and moved into a
composite key for better
+ * distribution. Though, to keep the amount of shuffled data low, this
is only done if values
+ * are assigned to a single window or if there are only few keys and
distributing data is
+ * important. After the aggregation, windowed values are restored from
the composite key.
+ * <li>All other cases are implemented using the SDK {@link ReduceFnRunner}.
+ * </ul>
+ */
class GroupByKeyTranslatorBatch<K, V>
- implements TransformTranslator<
- PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>> {
+ extends GroupingTranslator<K, V, Iterable<V>, GroupByKey<K, V>> {
+
+ /** Literal of binary encoded Pane info. */
+ private static final Expression PANE_NO_FIRING = lit(toByteArray(NO_FIRING,
PaneInfoCoder.of()));
+
+ /** Defaults for value in single global window. */
+ private static final List<Expression> GLOBAL_WINDOW_DETAILS =
+ windowDetails(lit(new byte[][] {EMPTY_BYTE_ARRAY}));
+
+ private boolean useCollectList = true;
+
+ public GroupByKeyTranslatorBatch() {}
+
+ public GroupByKeyTranslatorBatch(boolean useCollectList) {
+ this.useCollectList = useCollectList;
+ }
@Override
- public void translateTransform(
- PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>
transform,
- AbstractTranslationContext context) {
-
- @SuppressWarnings("unchecked")
- final PCollection<KV<K, V>> inputPCollection = (PCollection<KV<K, V>>)
context.getInput();
- Dataset<WindowedValue<KV<K, V>>> input =
context.getDataset(inputPCollection);
- WindowingStrategy<?, ?> windowingStrategy =
inputPCollection.getWindowingStrategy();
- KvCoder<K, V> kvCoder = (KvCoder<K, V>) inputPCollection.getCoder();
- Coder<V> valueCoder = kvCoder.getValueCoder();
-
- // group by key only
- Coder<K> keyCoder = kvCoder.getKeyCoder();
- KeyValueGroupedDataset<K, WindowedValue<KV<K, V>>> groupByKeyOnly =
- input.groupByKey(KVHelpers.extractKey(),
EncoderHelpers.fromBeamCoder(keyCoder));
-
- // group also by windows
- WindowedValue.FullWindowedValueCoder<KV<K, Iterable<V>>> outputCoder =
- WindowedValue.FullWindowedValueCoder.of(
- KvCoder.of(keyCoder, IterableCoder.of(valueCoder)),
- windowingStrategy.getWindowFn().windowCoder());
- Dataset<WindowedValue<KV<K, Iterable<V>>>> output =
- groupByKeyOnly.flatMapGroups(
- new GroupAlsoByWindowViaOutputBufferFn<>(
- windowingStrategy,
- new InMemoryStateInternalsFactory<>(),
- SystemReduceFn.buffering(valueCoder),
- context.getSerializableOptions()),
- EncoderHelpers.fromBeamCoder(outputCoder));
-
- context.putDataset(context.getOutput(), output);
+ public void translate(GroupByKey<K, V> transform, Context cxt) {
+ WindowingStrategy<?, ?> windowing = cxt.getInput().getWindowingStrategy();
+ TimestampCombiner tsCombiner = windowing.getTimestampCombiner();
+
+ Dataset<WindowedValue<KV<K, V>>> input = cxt.getDataset(cxt.getInput());
+
+ KvCoder<K, V> inputCoder = (KvCoder<K, V>) cxt.getInput().getCoder();
+ KvCoder<K, Iterable<V>> outputCoder = (KvCoder<K, Iterable<V>>)
cxt.getOutput().getCoder();
+
+ Encoder<V> valueEnc = cxt.valueEncoderOf(inputCoder);
+ Encoder<K> keyEnc = cxt.keyEncoderOf(inputCoder);
+
+ // In batch we can ignore triggering and allowed lateness parameters
Review Comment:
processing time trigger in batch mode is more a synomym of "not based on
event time" rather than "based on wall clock". In that case the trigger is
based on elements: it is firing every 2 elements seen. So I think that makes
sense in batch mode.
Regarding watermak triggers in batch mode, they should better understood as
triggers based on even timestamps as there is no flow of data in batch mode:
please consider that event timestamps can be but on elements even in batch mode
(cf replay of streaming recording) and in the case trigering on the event
timestamp makes sense.
But I agree these are corner cases not frequent at all among users.
Both nexmark queries run fine but beware nexmark does not check the
correctness of data. So if there is no VR test on these corner cases, then it
is not sure this behavior is correct.
Anyway fair enough as the RDD runner does the same and AFAIK no user
complained.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]