This is an automated email from the ASF dual-hosted git repository. mxm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
commit 785609f22d013411b7973bbf9e2d15c3c8171fb2 Merge: c9c239f 2dc923b Author: Maximilian Michels <[email protected]> AuthorDate: Mon Jan 6 19:30:10 2020 +0100 Merge pull request #10027: [BEAM-8577] Initialize FileSystems in coder for Reshuffle .../translation/types/CoderTypeSerializer.java | 19 ++++++++++ .../translation/types/CoderTypeSerializer.java | 19 ++++++++++ .../flink/FlinkBatchTransformTranslators.java | 24 ++++++++++--- .../functions/FlinkIdentityFunction.java | 42 ++++++++++++++++++++++ .../translation/types/CoderTypeInformation.java | 26 ++++++++++++-- 5 files changed, 124 insertions(+), 6 deletions(-) diff --cc runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java index 229eca5,27c9fba..28351d5 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java @@@ -306,11 -308,19 +308,25 @@@ class FlinkBatchTransformTranslators @Override public void translateNode( Reshuffle<K, InputT> transform, FlinkBatchTranslationContext context) { - - DataSet<WindowedValue<KV<K, InputT>>> inputDataSet = + final DataSet<WindowedValue<KV<K, InputT>>> inputDataSet = context.getInputDataSet(context.getInput(transform)); - - context.setOutputDataSet(context.getOutput(transform), inputDataSet.rebalance()); ++ // Construct an instance of CoderTypeInformation which contains the pipeline options. ++ // This will be used to initialized FileSystems. + @SuppressWarnings("unchecked") + final CoderTypeInformation<WindowedValue<KV<K, InputT>>> outputType = + ((CoderTypeInformation) inputDataSet.getType()) + .withPipelineOptions(context.getPipelineOptions()); ++ // We insert a NOOP here to initialize the FileSystems via the above CoderTypeInformation. ++ // The output type coder may be relying on file system access. The shuffled data may have to ++ // be deserialized on a different machine using this coder where FileSystems has not been ++ // initialized. + final DataSet<WindowedValue<KV<K, InputT>>> retypedDataSet = + new MapOperator<>( + inputDataSet, + outputType, + FlinkIdentityFunction.of(), + getCurrentTransformName(context)); + context.setOutputDataSet(context.getOutput(transform), retypedDataSet.rebalance()); } } diff --cc runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkIdentityFunction.java index 0000000,be3db7c..f9128e7 mode 000000,100644..100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkIdentityFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkIdentityFunction.java @@@ -1,0 -1,42 +1,42 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.beam.runners.flink.translation.functions; + + import org.apache.flink.api.common.functions.MapFunction; + + /** + * A map function that outputs the input element without any change. + * + * @param <T> Input element type. + */ + public class FlinkIdentityFunction<T> implements MapFunction<T, T> { + - private static FlinkIdentityFunction<?> INSTANCE = new FlinkIdentityFunction<>(); ++ private static final FlinkIdentityFunction<?> INSTANCE = new FlinkIdentityFunction<>(); + + @SuppressWarnings("unchecked") + public static <T> FlinkIdentityFunction<T> of() { + return (FlinkIdentityFunction) INSTANCE; + } + + private FlinkIdentityFunction() {} + + @Override + public T map(T value) { + return value; + } + }
