This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 89df2bf87c7dc926ff9c0a4cb38b83fed1a545a1 Author: Etienne Chauchot <[email protected]> AuthorDate: Mon May 27 15:10:02 2019 +0200 Re-code GroupByKeyTranslatorBatch to conserve windowing instead of unwindowing/windowing(GlobalWindow): simplify code, use ReduceFnRunner to merge the windows --- .../batch/GroupByKeyTranslatorBatch.java | 74 ++++++--- .../GroupAlsoByWindowViaOutputBufferFn.java | 168 +++++++++++++++++++++ 2 files changed, 217 insertions(+), 25 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java index 48cceee..b2b4441 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java @@ -17,16 +17,23 @@ */ package org.apache.beam.runners.spark.structuredstreaming.translation.batch; +import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import org.apache.beam.runners.core.InMemoryStateInternals; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StateInternalsFactory; +import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator; import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; +import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.GroupAlsoByWindowViaOutputBufferFn; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers; -import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.WindowingHelpers; +import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; @@ -42,34 +49,51 @@ class GroupByKeyTranslatorBatch<K, V> PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> transform, TranslationContext context) { - Dataset<WindowedValue<KV<K, V>>> input = context.getDataset(context.getInput()); + @SuppressWarnings("unchecked") + final PCollection<KV<K, V>> inputPCollection = (PCollection<KV<K, V>>) context.getInput(); - // Extract key to group by key only. - KeyValueGroupedDataset<K, KV<K, V>> grouped = - input - .map(WindowingHelpers.unwindowMapFunction(), EncoderHelpers.kvEncoder()) - .groupByKey((MapFunction<KV<K, V>, K>) KV::getKey, EncoderHelpers.genericEncoder()); + Dataset<WindowedValue<KV<K, V>>> input = context.getDataset(inputPCollection); - // Materialize grouped values, potential OOM because of creation of new iterable - Dataset<KV<K, Iterable<V>>> materialized = - grouped.mapGroups( - (MapGroupsFunction<K, KV<K, V>, KV<K, Iterable<V>>>) - // TODO: We need to improve this part and avoid creating of new List (potential OOM) - // (key, iterator) -> KV.of(key, () -> Iterators.transform(iterator, KV::getValue)), - (key, iterator) -> { - List<V> values = new ArrayList<>(); - while (iterator.hasNext()) { - values.add(iterator.next().getValue()); - } - return KV.of(key, Iterables.unmodifiableIterable(values)); - }, - EncoderHelpers.kvEncoder()); + //group by key only + KeyValueGroupedDataset<K, WindowedValue<KV<K, V>>> groupByKeyOnly = input + .groupByKey((MapFunction<WindowedValue<KV<K, V>>, K>) wv -> wv.getValue().getKey(), + EncoderHelpers.genericEncoder()); - // Window the result into global window. - Dataset<WindowedValue<KV<K, Iterable<V>>>> output = - materialized.map( - WindowingHelpers.windowMapFunction(), EncoderHelpers.windowedValueEncoder()); + // Materialize groupByKeyOnly values, potential OOM because of creation of new iterable + Dataset<KV<K, Iterable<WindowedValue<V>>>> materialized = groupByKeyOnly.mapGroups( + (MapGroupsFunction<K, WindowedValue<KV<K, V>>, KV<K, Iterable<WindowedValue<V>>>>) (key, iterator) -> { + List<WindowedValue<V>> values = new ArrayList<>(); + while (iterator.hasNext()) { + WindowedValue<KV<K, V>> next = iterator.next(); + values.add(WindowedValue + .of(next.getValue().getValue(), next.getTimestamp(), next.getWindows(), + next.getPane())); + } + KV<K, Iterable<WindowedValue<V>>> kv = KV.of(key, Iterables.unmodifiableIterable(values)); + return kv; + }, EncoderHelpers.kvEncoder()); + + WindowingStrategy<?, ?> windowingStrategy = inputPCollection.getWindowingStrategy(); + KvCoder<K, V> coder = (KvCoder<K, V>) inputPCollection.getCoder(); + // group also by windows + Dataset<WindowedValue<KV<K, Iterable<V>>>> output = materialized.flatMap( + new GroupAlsoByWindowViaOutputBufferFn<>(windowingStrategy, + new InMemoryStateInternalsFactory<>(), SystemReduceFn.buffering(coder.getValueCoder()), + context.getSerializableOptions()), EncoderHelpers.windowedValueEncoder()); context.putDataset(context.getOutput(), output); } + + /** + * In-memory state internals factory. + * + * @param <K> State key type. + */ + static class InMemoryStateInternalsFactory<K> implements StateInternalsFactory<K>, Serializable { + @Override + public StateInternals stateInternalsForKey(K key) { + return InMemoryStateInternals.forKey(key); + } + } + } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java new file mode 100644 index 0000000..2fb08f5 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import org.apache.beam.runners.core.InMemoryTimerInternals; +import org.apache.beam.runners.core.OutputWindowedValue; +import org.apache.beam.runners.core.ReduceFnRunner; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StateInternalsFactory; +import org.apache.beam.runners.core.SystemReduceFn; +import org.apache.beam.runners.core.UnsupportedSideInputReader; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; +import org.apache.beam.runners.core.construction.TriggerTranslation; +import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; +import org.apache.beam.runners.core.triggers.TriggerStateMachines; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.joda.time.Instant; + +/** A FlatMap function that groups by windows in batch mode using {@link ReduceFnRunner}. */ +public class GroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends BoundedWindow> + implements FlatMapFunction< + KV<K, Iterable<WindowedValue<InputT>>>, + WindowedValue<KV<K, Iterable<InputT>>>> { + + private final WindowingStrategy<?, W> windowingStrategy; + private final StateInternalsFactory<K> stateInternalsFactory; + private final SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, W> reduceFn; + private final SerializablePipelineOptions options; + + public GroupAlsoByWindowViaOutputBufferFn( + WindowingStrategy<?, W> windowingStrategy, + StateInternalsFactory<K> stateInternalsFactory, + SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, W> reduceFn, + SerializablePipelineOptions options) { + this.windowingStrategy = windowingStrategy; + this.stateInternalsFactory = stateInternalsFactory; + this.reduceFn = reduceFn; + this.options = options; + } + + @Override + public Iterator<WindowedValue<KV<K, Iterable<InputT>>>> call( + KV<K, Iterable<WindowedValue<InputT>>> kv) throws Exception { + K key = kv.getKey(); + Iterable<WindowedValue<InputT>> values = kv.getValue(); + + // ------ based on GroupAlsoByWindowsViaOutputBufferDoFn ------// + + // Used with Batch, we know that all the data is available for this key. We can't use the + // timer manager from the context because it doesn't exist. So we create one and emulate the + // watermark, knowing that we have all data and it is in timestamp order. + InMemoryTimerInternals timerInternals = new InMemoryTimerInternals(); + timerInternals.advanceProcessingTime(Instant.now()); + timerInternals.advanceSynchronizedProcessingTime(Instant.now()); + StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key); + GABWOutputWindowedValue<K, InputT> outputter = new GABWOutputWindowedValue<>(); + + ReduceFnRunner<K, InputT, Iterable<InputT>, W> reduceFnRunner = + new ReduceFnRunner<>( + key, + windowingStrategy, + ExecutableTriggerStateMachine.create( + TriggerStateMachines.stateMachineForTrigger( + TriggerTranslation.toProto(windowingStrategy.getTrigger()))), + stateInternals, + timerInternals, + outputter, + new UnsupportedSideInputReader("GroupAlsoByWindow"), + reduceFn, + options.get()); + + // Process the grouped values. + reduceFnRunner.processElements(values); + + // Finish any pending windows by advancing the input watermark to infinity. + timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE); + + // not supported yet +/* + // Finally, advance the processing time to infinity to fire any timers. + timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE); + timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE); + + fireEligibleTimers(timerInternals, reduceFnRunner); +*/ + + reduceFnRunner.persist(); + + return outputter.getOutputs().iterator(); + } + +/* private void fireEligibleTimers( + InMemoryTimerInternals timerInternals, + ReduceFnRunner<K, InputT, Iterable<InputT>, W> reduceFnRunner) + throws Exception { + List<TimerInternals.TimerData> timers = new ArrayList<>(); + while (true) { + TimerInternals.TimerData timer; + while ((timer = timerInternals.removeNextEventTimer()) != null) { + timers.add(timer); + } + while ((timer = timerInternals.removeNextProcessingTimer()) != null) { + timers.add(timer); + } + while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) { + timers.add(timer); + } + if (timers.isEmpty()) { + break; + } + reduceFnRunner.onTimers(timers); + timers.clear(); + } + }*/ + + private static class GABWOutputWindowedValue<K, V> + implements OutputWindowedValue<KV<K, Iterable<V>>> { + private final List<WindowedValue<KV<K, Iterable<V>>>> outputs = new ArrayList<>(); + + @Override + public void outputWindowedValue( + KV<K, Iterable<V>> output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + outputs.add(WindowedValue.of(output, timestamp, windows, pane)); + } + + @Override + public <AdditionalOutputT> void outputWindowedValue( + TupleTag<AdditionalOutputT> tag, + AdditionalOutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + throw new UnsupportedOperationException("GroupAlsoByWindow should not use tagged outputs."); + } + + Iterable<WindowedValue<KV<K, Iterable<V>>>> getOutputs() { + return outputs; + } + } +}
