This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch executors in repository https://gitbox.apache.org/repos/asf/flink.git
commit b9f8384b76f45b21c8120526a5db88cddc8e336f Author: Kostas Kloudas <kklou...@gmail.com> AuthorDate: Thu Nov 7 15:32:16 2019 +0100 [hotfix] Generalize and move config utils from flink-yarn to flink-core --- .../flink/client/cli/ExecutionConfigAccessor.java | 30 ++++-- .../client/cli/ExecutionConfigurationUtils.java | 86 ---------------- .../apache/flink/configuration/ConfigUtils.java | 114 +++++++++++++++++++++ .../apache/flink/yarn/YarnClusterDescriptor.java | 4 +- .../apache/flink/yarn/cli/FlinkYarnSessionCli.java | 3 +- .../org/apache/flink/yarn/cli/YarnConfigUtils.java | 76 -------------- 6 files changed, 137 insertions(+), 176 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java index 5bb524d..ccfa2ca 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java @@ -20,6 +20,8 @@ package org.apache.flink.client.cli; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.ExecutionOptions; @@ -69,7 +71,10 @@ public class ExecutionConfigAccessor { configuration.setBoolean(ExecutionOptions.ATTACHED, !options.getDetachedMode()); configuration.setBoolean(ExecutionOptions.SHUTDOWN_IF_ATTACHED, options.isShutdownOnAttachedExit()); - parseClasspathURLsToConfig(options.getClasspaths(), configuration); + if (options.getClasspaths() != null) { + ConfigUtils.encodeStreamToConfig(configuration, PipelineOptions.CLASSPATHS, options.getClasspaths().stream(), URL::toString); + } + parseJarURLToConfig(options.getJarFilePath(), configuration); SavepointRestoreSettings.toConfiguration(options.getSavepointRestoreSettings(), configuration); @@ -77,13 +82,6 @@ public class ExecutionConfigAccessor { return new ExecutionConfigAccessor(configuration); } - private static void parseClasspathURLsToConfig(final List<URL> classpathURLs, final Configuration configuration) { - ExecutionConfigurationUtils.urlListToConfig( - classpathURLs, - configuration, - PipelineOptions.CLASSPATHS); - } - private static void parseJarURLToConfig(final String jarFile, final Configuration configuration) { if (jarFile == null) { return; @@ -92,7 +90,7 @@ public class ExecutionConfigAccessor { try { final URL jarUrl = new File(jarFile).getAbsoluteFile().toURI().toURL(); final List<URL> jarUrlSingleton = Collections.singletonList(jarUrl); - ExecutionConfigurationUtils.urlListToConfig(jarUrlSingleton, configuration, PipelineOptions.JARS); + ConfigUtils.encodeStreamToConfig(configuration, PipelineOptions.JARS, jarUrlSingleton.stream(), URL::toString); } catch (MalformedURLException e) { throw new IllegalArgumentException("JAR file path invalid", e); } @@ -103,7 +101,7 @@ public class ExecutionConfigAccessor { } public URL getJarFilePath() { - final List<URL> jarURL = ExecutionConfigurationUtils.urlListFromConfig(configuration, PipelineOptions.JARS); + final List<URL> jarURL = decodeUrlList(configuration, PipelineOptions.JARS); if (jarURL != null && !jarURL.isEmpty()) { return jarURL.get(0); } @@ -111,7 +109,17 @@ public class ExecutionConfigAccessor { } public List<URL> getClasspaths() { - return ExecutionConfigurationUtils.urlListFromConfig(configuration, PipelineOptions.CLASSPATHS); + return decodeUrlList(configuration, PipelineOptions.CLASSPATHS); + } + + private List<URL> decodeUrlList(final Configuration configuration, final ConfigOption<List<String>> configOption) { + return ConfigUtils.decodeListFromConfig(configuration, configOption, url -> { + try { + return new URL(url); + } catch (MalformedURLException e) { + throw new IllegalArgumentException("Invalid URL", e); + } + }); } public int getParallelism() { diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigurationUtils.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigurationUtils.java deleted file mode 100644 index e25c129..0000000 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigurationUtils.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.client.cli; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.Configuration; - -import java.net.MalformedURLException; -import java.net.URL; -import java.util.Collections; -import java.util.List; -import java.util.stream.Collectors; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Utilities for parsing parameters in the {@link ExecutionConfigAccessor}. - */ -@Internal -class ExecutionConfigurationUtils { - - /** - * Parses a list of {@link URL URLs} to a string and puts it in the provided {@code configuration} as the value of the provided {@code option}. - * @param urls the list of URLs to parse - * @param configuration the configuration object to put the list - * @param option the {@link ConfigOption option} to serve as the key for the list in the configuration - * @return the produced list of strings to be put in the configuration. - */ - static List<String> urlListToConfig( - final List<URL> urls, - final Configuration configuration, - final ConfigOption<List<String>> option) { - - checkNotNull(urls); - checkNotNull(configuration); - checkNotNull(option); - - final List<String> str = urls.stream().map(URL::toString).collect(Collectors.toList()); - configuration.set(option, str); - return str; - } - - /** - * Parses a string into a list of {@link URL URLs} from a given {@link Configuration}. - * @param configuration the configuration containing the string-ified list of URLs - * @param option the {@link ConfigOption option} whose value is the list of URLs - * @return the produced list of URLs. - */ - static List<URL> urlListFromConfig( - final Configuration configuration, - final ConfigOption<List<String>> option) { - - checkNotNull(configuration); - checkNotNull(option); - - final List<String> urls = configuration.get(option); - if (urls == null || urls.isEmpty()) { - return Collections.emptyList(); - } - - return urls.stream().map(str -> { - try { - return new URL(str); - } catch (MalformedURLException e) { - throw new IllegalArgumentException("Invalid URL", e); - } - }).collect(Collectors.toList()); - } -} diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigUtils.java new file mode 100644 index 0000000..4a38728 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigUtils.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.Internal; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * General utilities for parsing values to configuration options. + */ +@Internal +public class ConfigUtils { + + /** + * Puts an array of values of type {@code IN} in a {@link Configuration} + * as a {@link ConfigOption} of type {@link List} of type {@code OUT}. + * + * @param configuration the configuration object to put the list in + * @param key the {@link ConfigOption option} to serve as the key for the list in the configuration + * @param value the array of values to put as value for the {@code key} + * @param mapper the transformation function from {@code IN} to {@code OUT}. + */ + public static <IN, OUT> void encodeArrayToConfig( + final Configuration configuration, + final ConfigOption<List<OUT>> key, + final IN[] value, + final Function<IN, OUT> mapper) { + if (value == null) { + return; + } + encodeStreamToConfig(configuration, key, Arrays.stream(value), mapper); + } + + /** + * Puts a {@link Stream} of values of type {@code IN} in a {@link Configuration} + * as a {@link ConfigOption} of type {@link List} of type {@code OUT}. + * + * @param configuration the configuration object to put the list in + * @param key the {@link ConfigOption option} to serve as the key for the list in the configuration + * @param values the stream of values to put as value for the {@code key} + * @param mapper the transformation function from {@code IN} to {@code OUT}. + */ + public static <IN, OUT> void encodeStreamToConfig( + final Configuration configuration, + final ConfigOption<List<OUT>> key, + final Stream<IN> values, + final Function<IN, OUT> mapper) { + + checkNotNull(configuration); + checkNotNull(key); + checkNotNull(mapper); + + if (values == null) { + return; + } + + final List<OUT> encodedOption = values.map(mapper).filter(Objects::nonNull).collect(Collectors.toList()); + if (!encodedOption.isEmpty()) { + configuration.set(key, encodedOption); + } + } + + /** + * Gets a {@link List} of values of type {@code IN} from a {@link Configuration} + * and transforms it to a {@link List} of type {@code OUT} based on the provided {@code mapper} function. + * + * @param configuration the configuration object to get the value out of + * @param key the {@link ConfigOption option} to serve as the key for the list in the configuration + * @param mapper the transformation function from {@code IN} to {@code OUT}. + * @return the transformed values in a list of type {@code OUT}. + */ + public static <IN, OUT> List<OUT> decodeListFromConfig( + final Configuration configuration, + final ConfigOption<List<IN>> key, + final Function<IN, OUT> mapper) { + + checkNotNull(configuration); + checkNotNull(key); + checkNotNull(mapper); + + final List<IN> encodedString = configuration.get(key); + return encodedString != null + ? encodedString.stream().map(mapper).collect(Collectors.toList()) + : Collections.emptyList(); + } + + private ConfigUtils() { + } +} diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index 1d60249..a9a0ac4 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -27,6 +27,7 @@ import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.ConfigUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.ExecutionOptions; @@ -48,7 +49,6 @@ import org.apache.flink.runtime.taskexecutor.TaskManagerServices; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.ShutdownHookUtil; -import org.apache.flink.yarn.cli.YarnConfigUtils; import org.apache.flink.yarn.configuration.YarnConfigOptions; import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal; import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint; @@ -185,7 +185,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { private Optional<List<File>> decodeDirsToShipToCluster(final Configuration configuration) { checkNotNull(configuration); - final List<File> files = YarnConfigUtils.decodeListFromConfig(configuration, YarnConfigOptions.SHIP_DIRECTORIES, File::new); + final List<File> files = ConfigUtils.decodeListFromConfig(configuration, YarnConfigOptions.SHIP_DIRECTORIES, File::new); return files.isEmpty() ? Optional.empty() : Optional.of(files); } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index 90fcbfc..a229170 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -27,6 +27,7 @@ import org.apache.flink.client.deployment.ClusterClientServiceLoader; import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader; import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.ConfigUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.ExecutionMode; @@ -286,7 +287,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine { checkNotNull(configuration); if (cmd.hasOption(shipPath.getOpt())) { - YarnConfigUtils.encodeListToConfig( + ConfigUtils.encodeArrayToConfig( configuration, YarnConfigOptions.SHIP_DIRECTORIES, cmd.getOptionValues(this.shipPath.getOpt()), diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/YarnConfigUtils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/YarnConfigUtils.java deleted file mode 100644 index 3012d7c..0000000 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/YarnConfigUtils.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.yarn.cli; - -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.Configuration; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Objects; -import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Utilities for parsing {@link org.apache.flink.configuration.ConfigOption configuration options}. - */ -public class YarnConfigUtils { - - public static <T> void encodeListToConfig( - final Configuration configuration, - final ConfigOption<List<String>> key, - final T[] value, - final Function<T, String> mapper) { - encodeListToConfig(configuration, key, Arrays.stream(value), mapper); - } - - private static <T> void encodeListToConfig( - final Configuration configuration, - final ConfigOption<List<String>> key, - final Stream<T> values, - final Function<T, String> mapper) { - - checkNotNull(values); - checkNotNull(key); - checkNotNull(configuration); - - final List<String> encodedString = values.map(mapper).filter(Objects::nonNull).collect(Collectors.toList()); - if (!encodedString.isEmpty()) { - configuration.set(key, encodedString); - } - } - - public static <R> List<R> decodeListFromConfig( - final Configuration configuration, - final ConfigOption<List<String>> key, - final Function<String, R> mapper) { - - checkNotNull(configuration); - checkNotNull(key); - - final List<String> encodedString = configuration.get(key); - return encodedString != null - ? encodedString.stream().map(mapper).collect(Collectors.toList()) - : Collections.emptyList(); - } -}