This is an automated email from the ASF dual-hosted git repository. mxm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
commit 2dc923b0aeb627d0db5d7b2a41f56865659fd629 Author: David Moravek <[email protected]> AuthorDate: Thu Nov 7 15:39:14 2019 +0100 [BEAM-8577] Initialize FileSystems during Coder deserialization in Reshuffle reduce phase. --- .../translation/types/CoderTypeSerializer.java | 19 ++++++++++ .../translation/types/CoderTypeSerializer.java | 19 ++++++++++ .../flink/FlinkBatchTransformTranslators.java | 18 +++++++--- .../functions/FlinkIdentityFunction.java | 42 ++++++++++++++++++++++ .../translation/types/CoderTypeInformation.java | 26 ++++++++++++-- 5 files changed, 118 insertions(+), 6 deletions(-) diff --git a/runners/flink/1.7/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/1.7/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java index e29f97e..807faf5 100644 --- a/runners/flink/1.7/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java +++ b/runners/flink/1.7/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java @@ -20,6 +20,8 @@ package org.apache.beam.runners.flink.translation.types; import java.io.EOFException; import java.io.IOException; import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper; import org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper; import org.apache.beam.sdk.coders.Coder; @@ -40,9 +42,26 @@ public class CoderTypeSerializer<T> extends TypeSerializer<T> { private final Coder<T> coder; + /** + * {@link SerializablePipelineOptions} deserialization will cause {@link + * org.apache.beam.sdk.io.FileSystems} registration needed for {@link + * org.apache.beam.sdk.transforms.Reshuffle} translation. + */ + @SuppressWarnings("unused") + @Nullable + private final SerializablePipelineOptions pipelineOptions; + public CoderTypeSerializer(Coder<T> coder) { Preconditions.checkNotNull(coder); this.coder = coder; + this.pipelineOptions = null; + } + + public CoderTypeSerializer( + Coder<T> coder, @Nullable SerializablePipelineOptions pipelineOptions) { + Preconditions.checkNotNull(coder); + this.coder = coder; + this.pipelineOptions = pipelineOptions; } @Override diff --git a/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java index 2ff1cda..276e49c 100644 --- a/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java +++ b/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java @@ -19,6 +19,8 @@ package org.apache.beam.runners.flink.translation.types; import java.io.EOFException; import java.io.IOException; +import javax.annotation.Nullable; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper; import org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper; import org.apache.beam.sdk.coders.Coder; @@ -41,9 +43,26 @@ public class CoderTypeSerializer<T> extends TypeSerializer<T> { private final Coder<T> coder; + /** + * {@link SerializablePipelineOptions} deserialization will cause {@link + * org.apache.beam.sdk.io.FileSystems} registration needed for {@link + * org.apache.beam.sdk.transforms.Reshuffle} translation. + */ + @SuppressWarnings("unused") + @Nullable + private final SerializablePipelineOptions pipelineOptions; + public CoderTypeSerializer(Coder<T> coder) { Preconditions.checkNotNull(coder); this.coder = coder; + this.pipelineOptions = null; + } + + public CoderTypeSerializer( + Coder<T> coder, @Nullable SerializablePipelineOptions pipelineOptions) { + Preconditions.checkNotNull(coder); + this.coder = coder; + this.pipelineOptions = pipelineOptions; } @Override diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java index 229eca5..27c9fba 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java @@ -34,6 +34,7 @@ import org.apache.beam.runners.core.construction.ParDoTranslation; import org.apache.beam.runners.core.construction.ReadTranslation; import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; import org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkIdentityFunction; import org.apache.beam.runners.flink.translation.functions.FlinkMergingNonShuffleReduceFunction; import org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction; import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction; @@ -84,6 +85,7 @@ import org.apache.flink.api.java.operators.FlatMapOperator; import org.apache.flink.api.java.operators.GroupCombineOperator; import org.apache.flink.api.java.operators.GroupReduceOperator; import org.apache.flink.api.java.operators.Grouping; +import org.apache.flink.api.java.operators.MapOperator; import org.apache.flink.api.java.operators.MapPartitionOperator; import org.apache.flink.api.java.operators.SingleInputUdfOperator; @@ -306,11 +308,19 @@ class FlinkBatchTransformTranslators { @Override public void translateNode( Reshuffle<K, InputT> transform, FlinkBatchTranslationContext context) { - - DataSet<WindowedValue<KV<K, InputT>>> inputDataSet = + final DataSet<WindowedValue<KV<K, InputT>>> inputDataSet = context.getInputDataSet(context.getInput(transform)); - - context.setOutputDataSet(context.getOutput(transform), inputDataSet.rebalance()); + @SuppressWarnings("unchecked") + final CoderTypeInformation<WindowedValue<KV<K, InputT>>> outputType = + ((CoderTypeInformation) inputDataSet.getType()) + .withPipelineOptions(context.getPipelineOptions()); + final DataSet<WindowedValue<KV<K, InputT>>> retypedDataSet = + new MapOperator<>( + inputDataSet, + outputType, + FlinkIdentityFunction.of(), + getCurrentTransformName(context)); + context.setOutputDataSet(context.getOutput(transform), retypedDataSet.rebalance()); } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkIdentityFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkIdentityFunction.java new file mode 100644 index 0000000..be3db7c --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkIdentityFunction.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import org.apache.flink.api.common.functions.MapFunction; + +/** + * A map function that outputs the input element without any change. + * + * @param <T> Input element type. + */ +public class FlinkIdentityFunction<T> implements MapFunction<T, T> { + + private static FlinkIdentityFunction<?> INSTANCE = new FlinkIdentityFunction<>(); + + @SuppressWarnings("unchecked") + public static <T> FlinkIdentityFunction<T> of() { + return (FlinkIdentityFunction) INSTANCE; + } + + private FlinkIdentityFunction() {} + + @Override + public T map(T value) { + return value; + } +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java index c03bef9..5e76923 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java @@ -19,7 +19,10 @@ package org.apache.beam.runners.flink.translation.types; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; +import javax.annotation.Nullable; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.AtomicType; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -33,10 +36,18 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; public class CoderTypeInformation<T> extends TypeInformation<T> implements AtomicType<T> { private final Coder<T> coder; + @Nullable private final SerializablePipelineOptions pipelineOptions; public CoderTypeInformation(Coder<T> coder) { checkNotNull(coder); this.coder = coder; + this.pipelineOptions = null; + } + + private CoderTypeInformation(Coder<T> coder, PipelineOptions pipelineOptions) { + checkNotNull(coder); + this.coder = coder; + this.pipelineOptions = new SerializablePipelineOptions(pipelineOptions); } public Coder<T> getCoder() { @@ -70,9 +81,8 @@ public class CoderTypeInformation<T> extends TypeInformation<T> implements Atomi } @Override - @SuppressWarnings("unchecked") public TypeSerializer<T> createSerializer(ExecutionConfig config) { - return new CoderTypeSerializer<>(coder); + return new CoderTypeSerializer<>(coder, pipelineOptions); } @Override @@ -80,6 +90,18 @@ public class CoderTypeInformation<T> extends TypeInformation<T> implements Atomi return 2; } + /** + * Creates a new {@link CoderTypeInformation} with {@link PipelineOptions}, that can be used for + * {@link org.apache.beam.sdk.io.FileSystems} registration. + * + * @see <a href="https://issues.apache.org/jira/browse/BEAM-8577">Jira issue.</a> + * @param pipelineOptions Options of current pipeline. + * @return New type information. + */ + public CoderTypeInformation<T> withPipelineOptions(PipelineOptions pipelineOptions) { + return new CoderTypeInformation<>(getCoder(), pipelineOptions); + } + @Override public boolean equals(Object o) { if (this == o) {
