This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit c5e78a0f4552a094ba3914ef490629e136ac1beb Author: Etienne Chauchot <[email protected]> AuthorDate: Tue Oct 1 17:52:32 2019 +0200 Apply new Encoders to Pardo. Replace Tuple2Coder with MultiOutputCoder to deal with multiple output to use in Spark Encoder for DoFnRunner --- .../translation/batch/ParDoTranslatorBatch.java | 42 +++++++++------ .../translation/helpers/EncoderHelpers.java | 6 ++- .../translation/helpers/MultiOuputCoder.java | 49 +++++++++++++++++ .../translation/helpers/Tuple2Coder.java | 62 ---------------------- 4 files changed, 81 insertions(+), 78 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java index 255adc8..f5a109e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java @@ -31,8 +31,10 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTr import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.CoderHelpers; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers; +import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.MultiOuputCoder; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.SideInputBroadcast; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnSchemaInformation; import org.apache.beam.sdk.transforms.PTransform; @@ -84,12 +86,15 @@ class ParDoTranslatorBatch<InputT, OutputT> ParDoTranslation.getSchemaInformation(context.getCurrentTransform()); // Init main variables - Dataset<WindowedValue<InputT>> inputDataSet = context.getDataset(context.getInput()); + PValue input = context.getInput(); + Dataset<WindowedValue<InputT>> inputDataSet = context.getDataset(input); Map<TupleTag<?>, PValue> outputs = context.getOutputs(); TupleTag<?> mainOutputTag = getTupleTag(context); List<TupleTag<?>> outputTags = new ArrayList<>(outputs.keySet()); WindowingStrategy<?, ?> windowingStrategy = - ((PCollection<InputT>) context.getInput()).getWindowingStrategy(); + ((PCollection<InputT>) input).getWindowingStrategy(); + Coder<InputT> inputCoder = ((PCollection<InputT>) input).getCoder(); + Coder<? extends BoundedWindow> windowCoder = windowingStrategy.getWindowFn().windowCoder(); // construct a map from side input to WindowingStrategy so that // the DoFn runner can map main-input windows to side input windows @@ -102,8 +107,6 @@ class ParDoTranslatorBatch<InputT, OutputT> SideInputBroadcast broadcastStateData = createBroadcastSideInputs(sideInputs, context); Map<TupleTag<?>, Coder<?>> outputCoderMap = context.getOutputCoders(); - Coder<InputT> inputCoder = ((PCollection<InputT>) context.getInput()).getCoder(); - MetricsContainerStepMapAccumulator metricsAccum = MetricsAccumulator.getInstance(); List<TupleTag<?>> additionalOutputTags = new ArrayList<>(); @@ -129,19 +132,25 @@ class ParDoTranslatorBatch<InputT, OutputT> broadcastStateData, doFnSchemaInformation); + MultiOuputCoder multipleOutputCoder = MultiOuputCoder + .of(SerializableCoder.of(TupleTag.class), outputCoderMap, windowCoder); Dataset<Tuple2<TupleTag<?>, WindowedValue<?>>> allOutputs = - inputDataSet.mapPartitions(doFnWrapper, EncoderHelpers.tuple2Encoder()); + inputDataSet.mapPartitions(doFnWrapper, EncoderHelpers.fromBeamCoder(multipleOutputCoder)); if (outputs.entrySet().size() > 1) { allOutputs.persist(); for (Map.Entry<TupleTag<?>, PValue> output : outputs.entrySet()) { - pruneOutputFilteredByTag(context, allOutputs, output); + pruneOutputFilteredByTag(context, allOutputs, output, windowCoder); } } else { + Coder<OutputT> outputCoder = ((PCollection<OutputT>) outputs.get(mainOutputTag)).getCoder(); + Coder<WindowedValue<?>> windowedValueCoder = + (Coder<WindowedValue<?>>) + (Coder<?>) WindowedValue.getFullCoder(outputCoder, windowCoder); Dataset<WindowedValue<?>> outputDataset = allOutputs.map( (MapFunction<Tuple2<TupleTag<?>, WindowedValue<?>>, WindowedValue<?>>) value -> value._2, - EncoderHelpers.windowedValueEncoder()); + EncoderHelpers.fromBeamCoder(windowedValueCoder)); context.putDatasetWildcard(outputs.entrySet().iterator().next().getValue(), outputDataset); } } @@ -152,14 +161,14 @@ class ParDoTranslatorBatch<InputT, OutputT> JavaSparkContext.fromSparkContext(context.getSparkSession().sparkContext()); SideInputBroadcast sideInputBroadcast = new SideInputBroadcast(); - for (PCollectionView<?> input : sideInputs) { + for (PCollectionView<?> sideInput : sideInputs) { Coder<? extends BoundedWindow> windowCoder = - input.getPCollection().getWindowingStrategy().getWindowFn().windowCoder(); + sideInput.getPCollection().getWindowingStrategy().getWindowFn().windowCoder(); + Coder<WindowedValue<?>> windowedValueCoder = (Coder<WindowedValue<?>>) - (Coder<?>) WindowedValue.getFullCoder(input.getPCollection().getCoder(), windowCoder); - - Dataset<WindowedValue<?>> broadcastSet = context.getSideInputDataSet(input); + (Coder<?>) WindowedValue.getFullCoder(sideInput.getPCollection().getCoder(), windowCoder); + Dataset<WindowedValue<?>> broadcastSet = context.getSideInputDataSet(sideInput); List<WindowedValue<?>> valuesList = broadcastSet.collectAsList(); List<byte[]> codedValues = new ArrayList<>(); for (WindowedValue<?> v : valuesList) { @@ -167,7 +176,7 @@ class ParDoTranslatorBatch<InputT, OutputT> } sideInputBroadcast.add( - input.getTagInternal().getId(), jsc.broadcast(codedValues), windowedValueCoder); + sideInput.getTagInternal().getId(), jsc.broadcast(codedValues), windowedValueCoder); } return sideInputBroadcast; } @@ -206,14 +215,17 @@ class ParDoTranslatorBatch<InputT, OutputT> private void pruneOutputFilteredByTag( TranslationContext context, Dataset<Tuple2<TupleTag<?>, WindowedValue<?>>> allOutputs, - Map.Entry<TupleTag<?>, PValue> output) { + Map.Entry<TupleTag<?>, PValue> output, Coder<? extends BoundedWindow> windowCoder) { Dataset<Tuple2<TupleTag<?>, WindowedValue<?>>> filteredDataset = allOutputs.filter(new DoFnFilterFunction(output.getKey())); + Coder<WindowedValue<?>> windowedValueCoder = + (Coder<WindowedValue<?>>) + (Coder<?>) WindowedValue.getFullCoder(((PCollection<OutputT>)output.getValue()).getCoder(), windowCoder); Dataset<WindowedValue<?>> outputDataset = filteredDataset.map( (MapFunction<Tuple2<TupleTag<?>, WindowedValue<?>>, WindowedValue<?>>) value -> value._2, - EncoderHelpers.windowedValueEncoder()); + EncoderHelpers.fromBeamCoder(windowedValueCoder)); context.putDatasetWildcard(output.getValue(), outputDataset); } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 218dc0a..a4f0320 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -81,10 +81,14 @@ public class EncoderHelpers { return Encoders.kryo((Class<T>) Object.class); } - /** Get a bytes {@link Encoder} for {@link Tuple2}. Bytes serialisation is issued by Kryo */ +/* + */ +/** Get a bytes {@link Encoder} for {@link Tuple2}. Bytes serialisation is issued by Kryo *//* + public static <T1, T2> Encoder<Tuple2<T1, T2>> tuple2Encoder() { return Encoders.tuple(EncoderHelpers.genericEncoder(), EncoderHelpers.genericEncoder()); } +*/ /* --------- Bridges from Beam Coders to Spark Encoders diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/MultiOuputCoder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/MultiOuputCoder.java new file mode 100644 index 0000000..caaea01 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/MultiOuputCoder.java @@ -0,0 +1,49 @@ +package org.apache.beam.runners.spark.structuredstreaming.translation.helpers; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; +import scala.Tuple2; + +public class MultiOuputCoder<T> extends CustomCoder<Tuple2<TupleTag<T>, WindowedValue<T>>> { + Coder<TupleTag> tupleTagCoder; + Map<TupleTag<?>, Coder<?>> coderMap; + Coder<? extends BoundedWindow> windowCoder; + + public static MultiOuputCoder of(Coder<TupleTag> tupleTagCoder, Map<TupleTag<?>, Coder<?>> coderMap, Coder<? extends BoundedWindow> windowCoder) { + return new MultiOuputCoder(tupleTagCoder, coderMap, windowCoder); + } + + private MultiOuputCoder(Coder<TupleTag> tupleTagCoder, Map<TupleTag<?>, Coder<?>> coderMap, Coder<? extends BoundedWindow> windowCoder) { + this.tupleTagCoder = tupleTagCoder; + this.coderMap = coderMap; + this.windowCoder = windowCoder; + } + + @Override public void encode(Tuple2<TupleTag<T>, WindowedValue<T>> tuple2, OutputStream outStream) + throws IOException { + TupleTag<T> tupleTag = tuple2._1(); + tupleTagCoder.encode(tupleTag, outStream); + Coder<T> valueCoder = (Coder<T>)coderMap.get(tupleTag); + WindowedValue.FullWindowedValueCoder<T> wvCoder = WindowedValue.FullWindowedValueCoder + .of(valueCoder, windowCoder); + wvCoder.encode(tuple2._2(), outStream); + } + + @Override public Tuple2<TupleTag<T>, WindowedValue<T>> decode(InputStream inStream) + throws CoderException, IOException { + TupleTag<T> tupleTag = (TupleTag<T>) tupleTagCoder.decode(inStream); + Coder<T> valueCoder = (Coder<T>)coderMap.get(tupleTag); + WindowedValue.FullWindowedValueCoder<T> wvCoder = WindowedValue.FullWindowedValueCoder + .of(valueCoder, windowCoder); + WindowedValue<T> wv = wvCoder.decode(inStream); + return Tuple2.apply(tupleTag, wv); + } +} diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/Tuple2Coder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/Tuple2Coder.java deleted file mode 100644 index 1743a01..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/Tuple2Coder.java +++ /dev/null @@ -1,62 +0,0 @@ -package org.apache.beam.runners.spark.structuredstreaming.translation.helpers; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Arrays; -import java.util.List; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.StructuredCoder; -import scala.Tuple2; - -/** - * Beam coder to encode/decode Tuple2 scala types. - * @param <T1> first field type parameter - * @param <T2> second field type parameter - */ -public class Tuple2Coder<T1, T2> extends StructuredCoder<Tuple2<T1, T2>> { - private final Coder<T1> firstFieldCoder; - private final Coder<T2> secondFieldCoder; - - public static <K, V> Tuple2Coder<K, V> of(Coder<K> firstFieldCoder, Coder<V> secondFieldCoder) { - return new Tuple2Coder<>(firstFieldCoder, secondFieldCoder); - } - - private Tuple2Coder(Coder<T1> firstFieldCoder, Coder<T2> secondFieldCoder) { - this.firstFieldCoder = firstFieldCoder; - this.secondFieldCoder = secondFieldCoder; - } - - - @Override public void encode(Tuple2<T1, T2> value, OutputStream outStream) - throws IOException { - firstFieldCoder.encode(value._1(), outStream); - secondFieldCoder.encode(value._2(), outStream); - } - - @Override public Tuple2<T1, T2> decode(InputStream inStream) throws IOException { - T1 firstField = firstFieldCoder.decode(inStream); - T2 secondField = secondFieldCoder.decode(inStream); - return Tuple2.apply(firstField, secondField); - } - - @Override public List<? extends Coder<?>> getCoderArguments() { - return Arrays.asList(firstFieldCoder, secondFieldCoder); - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - verifyDeterministic(this, "First field coder must be deterministic", firstFieldCoder); - verifyDeterministic(this, "Second field coder must be deterministic", secondFieldCoder); - } - - /** Returns the coder for first field. */ - public Coder<T1> getFirstFieldCoder() { - return firstFieldCoder; - } - - /** Returns the coder for second field. */ - public Coder<T2> getSecondFieldCoder() { - return secondFieldCoder; - } -}
