This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 186a2dd6559653fc9573b9cdb5221347417b65a8 Author: Piotr Nowojski <[email protected]> AuthorDate: Fri Apr 26 15:46:12 2019 +0200 [FLINK-12143][dist] Ship plugins in the cluster the same way as lib jars --- .../org/apache/flink/client/cli/CliFrontend.java | 4 +- .../flink/configuration/GlobalConfiguration.java | 25 ++++++++++++ .../plugin/{PluginUtils.java => PluginConfig.java} | 46 ++++++++++++---------- .../org/apache/flink/core/plugin/PluginUtils.java | 23 +++++------ .../configuration/GlobalConfigurationTest.java | 2 +- .../mesos/entrypoint/MesosTaskExecutorRunner.java | 4 +- .../runtime/webmonitor/history/HistoryServer.java | 4 +- .../runtime/entrypoint/ClusterEntrypoint.java | 4 +- .../runtime/taskexecutor/TaskManagerRunner.java | 4 +- .../table/client/gateway/local/LocalExecutor.java | 4 +- .../flink/yarn/AbstractYarnClusterDescriptor.java | 6 +-- .../apache/flink/yarn/YarnTaskExecutorRunner.java | 4 +- 12 files changed, 73 insertions(+), 57 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index 06dd761..ec67fff 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -80,7 +80,6 @@ import java.util.Comparator; import java.util.Date; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -124,8 +123,7 @@ public class CliFrontend { this.configuration = Preconditions.checkNotNull(configuration); this.customCommandLines = Preconditions.checkNotNull(customCommandLines); - //TODO provide plugin path. - FileSystem.initialize(this.configuration, PluginUtils.createPluginManagerFromRootFolder(Optional.empty())); + FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(configuration)); this.customCommandLineOptions = new Options(); diff --git a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java index 564fe19..c0debab 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java @@ -130,9 +130,34 @@ public final class GlobalConfiguration { configuration.addAll(dynamicProperties); } + return enrichWithEnvironmentVariables(configuration); + } + + private static Configuration enrichWithEnvironmentVariables(Configuration configuration) { + enrichWithEnvironmentVariable(ConfigConstants.ENV_FLINK_PLUGINS_DIR, configuration); return configuration; } + private static void enrichWithEnvironmentVariable(String environmentVariable, Configuration configuration) { + String pluginsDirFromEnv = System.getenv(environmentVariable); + + if (pluginsDirFromEnv == null) { + return; + } + + String pluginsDirFromConfig = configuration.getString(environmentVariable, pluginsDirFromEnv); + + if (!pluginsDirFromEnv.equals(pluginsDirFromConfig)) { + throw new IllegalConfigurationException( + "The given configuration file already contains a value (" + pluginsDirFromEnv + + ") for the key (" + environmentVariable + + ") that would have been overwritten with (" + pluginsDirFromConfig + + ") by an environment with the same name."); + } + + configuration.setString(environmentVariable, pluginsDirFromEnv); + } + /** * Loads a YAML-file of key-value pairs. * diff --git a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginConfig.java similarity index 51% copy from flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java copy to flink-core/src/main/java/org/apache/flink/core/plugin/PluginConfig.java index 17e2ae4..c4a3b2b 100644 --- a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java +++ b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginConfig.java @@ -18,37 +18,41 @@ package org.apache.flink.core.plugin; -import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; -import java.io.IOException; +import java.io.File; import java.nio.file.Path; -import java.util.Collection; -import java.util.Collections; import java.util.Optional; /** - * Utility functions for the plugin mechanism. + * Stores the configuration for plugins mechanism. */ -public final class PluginUtils { +public class PluginConfig { + private final Optional<Path> pluginsPath; - private PluginUtils() { - throw new AssertionError("Singleton class."); + private PluginConfig() { + this.pluginsPath = Optional.empty(); } - public static PluginManager createPluginManagerFromRootFolder(Optional<Path> pluginsRootPath) { - Collection<PluginDescriptor> pluginDescriptorsForDirectory; - - if (pluginsRootPath.isPresent()) { - try { - pluginDescriptorsForDirectory = - new DirectoryBasedPluginFinder(pluginsRootPath.get()).findPlugins(); - } catch (IOException e) { - throw new FlinkRuntimeException("Exception when trying to initialize plugin system.", e); - } - } else { - pluginDescriptorsForDirectory = Collections.emptyList(); + private PluginConfig(Path pluginsPath) { + this.pluginsPath = Optional.of(pluginsPath); + } + + public Optional<Path> getPluginsPath() { + return pluginsPath; + } + + public static PluginConfig fromConfiguration(Configuration configuration) { + String pluginsDir = configuration.getString(ConfigConstants.ENV_FLINK_PLUGINS_DIR, null); + if (pluginsDir == null) { + return new PluginConfig(); } - return new PluginManager(pluginDescriptorsForDirectory); + File pluginsDirFile = new File(pluginsDir); + if (!pluginsDirFile.isDirectory()) { + return new PluginConfig(); + } + return new PluginConfig(pluginsDirFile.toPath()); } } diff --git a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java index 17e2ae4..0f9f0d3 100644 --- a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java +++ b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java @@ -18,13 +18,12 @@ package org.apache.flink.core.plugin; +import org.apache.flink.configuration.Configuration; import org.apache.flink.util.FlinkRuntimeException; import java.io.IOException; -import java.nio.file.Path; import java.util.Collection; import java.util.Collections; -import java.util.Optional; /** * Utility functions for the plugin mechanism. @@ -35,20 +34,22 @@ public final class PluginUtils { throw new AssertionError("Singleton class."); } - public static PluginManager createPluginManagerFromRootFolder(Optional<Path> pluginsRootPath) { - Collection<PluginDescriptor> pluginDescriptorsForDirectory; + public static PluginManager createPluginManagerFromRootFolder(Configuration configuration) { + return createPluginManagerFromRootFolder(PluginConfig.fromConfiguration(configuration)); + } - if (pluginsRootPath.isPresent()) { + private static PluginManager createPluginManagerFromRootFolder(PluginConfig pluginConfig) { + if (pluginConfig.getPluginsPath().isPresent()) { try { - pluginDescriptorsForDirectory = - new DirectoryBasedPluginFinder(pluginsRootPath.get()).findPlugins(); + Collection<PluginDescriptor> pluginDescriptors = + new DirectoryBasedPluginFinder(pluginConfig.getPluginsPath().get()).findPlugins(); + return new PluginManager(pluginDescriptors); } catch (IOException e) { throw new FlinkRuntimeException("Exception when trying to initialize plugin system.", e); } - } else { - pluginDescriptorsForDirectory = Collections.emptyList(); } - - return new PluginManager(pluginDescriptorsForDirectory); + else { + return new PluginManager(Collections.emptyList()); + } } } diff --git a/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java index a42564d..94bb872 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java @@ -97,7 +97,7 @@ public class GlobalConfigurationTest extends TestLogger { @Test(expected = IllegalArgumentException.class) public void testFailIfNull() { - GlobalConfiguration.loadConfiguration(null); + GlobalConfiguration.loadConfiguration((String) null); } @Test(expected = IllegalConfigurationException.class) diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java index e2d0161..4c0ec97 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java @@ -43,7 +43,6 @@ import org.slf4j.LoggerFactory; import java.lang.reflect.UndeclaredThrowableException; import java.util.Map; -import java.util.Optional; /** * The entry point for running a TaskManager in a Mesos container. @@ -87,8 +86,7 @@ public class MesosTaskExecutorRunner { final Map<String, String> envs = System.getenv(); // configure the filesystems - //TODO provide plugin path. - FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(Optional.empty())); + FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(configuration)); // tell akka to die in case of an error configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java index 12a5b56..e907836 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java @@ -58,7 +58,6 @@ import java.nio.file.Files; import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.List; -import java.util.Optional; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -111,8 +110,7 @@ public class HistoryServer { LOG.info("Loading configuration from {}", configDir); final Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir); - //TODO provide plugin path. - FileSystem.initialize(flinkConfig, PluginUtils.createPluginManagerFromRootFolder(Optional.empty())); + FileSystem.initialize(flinkConfig, PluginUtils.createPluginManagerFromRootFolder(flinkConfig)); // run the history server SecurityUtils.install(new SecurityConfiguration(flinkConfig)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index d6dc724..0538bec 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -77,7 +77,6 @@ import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; import java.util.ArrayList; import java.util.Collection; -import java.util.Optional; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; @@ -187,8 +186,7 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro private void configureFileSystems(Configuration configuration) { LOG.info("Install default filesystem."); - //TODO provide plugin path - FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(Optional.empty())); + FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(configuration)); } protected SecurityContext installSecurityContext(Configuration configuration) throws Exception { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index d5985b1..2b4e0f2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -70,7 +70,6 @@ import java.lang.reflect.UndeclaredThrowableException; import java.net.InetAddress; import java.util.ArrayList; import java.util.Collection; -import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -285,8 +284,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync final Configuration configuration = loadConfiguration(args); - //TODO provide plugin path. - FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(Optional.empty())); + FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(configuration)); SecurityUtils.install(new SecurityConfiguration(configuration)); diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java index 158c261..99939bd 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java @@ -69,7 +69,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; /** * Executor that performs the Flink communication locally. The calls are blocking depending on the @@ -113,8 +112,7 @@ public class LocalExecutor implements Executor { this.flinkConfig = GlobalConfiguration.loadConfiguration(flinkConfigDir); // initialize default file system - //TODO provide plugin path. - FileSystem.initialize(flinkConfig, PluginUtils.createPluginManagerFromRootFolder(Optional.empty())); + FileSystem.initialize(flinkConfig, PluginUtils.createPluginManagerFromRootFolder(flinkConfig)); // load command lines for deployment this.commandLines = CliFrontend.loadCustomCommandLines(flinkConfig, flinkConfigDir); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index ebf470c..8b51968 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -97,7 +97,6 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -660,8 +659,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor // ------------------ Initialize the file systems ------------------------- - //TODO provide plugin path. - org.apache.flink.core.fs.FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(Optional.empty())); + org.apache.flink.core.fs.FileSystem.initialize( + configuration, + PluginUtils.createPluginManagerFromRootFolder(configuration)); // initialize file system // Copy the application master jar to the filesystem diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java index d1a3700..bcb87e1 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java @@ -46,7 +46,6 @@ import java.io.File; import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; import java.util.Map; -import java.util.Optional; import java.util.concurrent.Callable; /** @@ -95,8 +94,7 @@ public class YarnTaskExecutorRunner { final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir); - //TODO provide path. - FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(Optional.empty())); + FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(configuration)); setupConfigurationAndInstallSecurityContext(configuration, currDir, ENV);
