This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 186a2dd6559653fc9573b9cdb5221347417b65a8
Author: Piotr Nowojski <[email protected]>
AuthorDate: Fri Apr 26 15:46:12 2019 +0200

    [FLINK-12143][dist] Ship plugins in the cluster the same way as lib jars
---
 .../org/apache/flink/client/cli/CliFrontend.java   |  4 +-
 .../flink/configuration/GlobalConfiguration.java   | 25 ++++++++++++
 .../plugin/{PluginUtils.java => PluginConfig.java} | 46 ++++++++++++----------
 .../org/apache/flink/core/plugin/PluginUtils.java  | 23 +++++------
 .../configuration/GlobalConfigurationTest.java     |  2 +-
 .../mesos/entrypoint/MesosTaskExecutorRunner.java  |  4 +-
 .../runtime/webmonitor/history/HistoryServer.java  |  4 +-
 .../runtime/entrypoint/ClusterEntrypoint.java      |  4 +-
 .../runtime/taskexecutor/TaskManagerRunner.java    |  4 +-
 .../table/client/gateway/local/LocalExecutor.java  |  4 +-
 .../flink/yarn/AbstractYarnClusterDescriptor.java  |  6 +--
 .../apache/flink/yarn/YarnTaskExecutorRunner.java  |  4 +-
 12 files changed, 73 insertions(+), 57 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 06dd761..ec67fff 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -80,7 +80,6 @@ import java.util.Comparator;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -124,8 +123,7 @@ public class CliFrontend {
                this.configuration = Preconditions.checkNotNull(configuration);
                this.customCommandLines = 
Preconditions.checkNotNull(customCommandLines);
 
-               //TODO provide plugin path.
-               FileSystem.initialize(this.configuration, 
PluginUtils.createPluginManagerFromRootFolder(Optional.empty()));
+               FileSystem.initialize(configuration, 
PluginUtils.createPluginManagerFromRootFolder(configuration));
 
                this.customCommandLineOptions = new Options();
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
index 564fe19..c0debab 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
@@ -130,9 +130,34 @@ public final class GlobalConfiguration {
                        configuration.addAll(dynamicProperties);
                }
 
+               return enrichWithEnvironmentVariables(configuration);
+       }
+
+       private static Configuration 
enrichWithEnvironmentVariables(Configuration configuration) {
+               
enrichWithEnvironmentVariable(ConfigConstants.ENV_FLINK_PLUGINS_DIR, 
configuration);
                return configuration;
        }
 
+       private static void enrichWithEnvironmentVariable(String 
environmentVariable, Configuration configuration) {
+               String pluginsDirFromEnv = System.getenv(environmentVariable);
+
+               if (pluginsDirFromEnv == null) {
+                       return;
+               }
+
+               String pluginsDirFromConfig = 
configuration.getString(environmentVariable, pluginsDirFromEnv);
+
+               if (!pluginsDirFromEnv.equals(pluginsDirFromConfig)) {
+                       throw new IllegalConfigurationException(
+                               "The given configuration file already contains 
a value (" + pluginsDirFromEnv +
+                                       ") for the key (" + environmentVariable 
+
+                                       ") that would have been overwritten 
with (" + pluginsDirFromConfig +
+                                       ") by an environment with the same 
name.");
+               }
+
+               configuration.setString(environmentVariable, pluginsDirFromEnv);
+       }
+
        /**
         * Loads a YAML-file of key-value pairs.
         *
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java 
b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginConfig.java
similarity index 51%
copy from flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java
copy to flink-core/src/main/java/org/apache/flink/core/plugin/PluginConfig.java
index 17e2ae4..c4a3b2b 100644
--- a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginConfig.java
@@ -18,37 +18,41 @@
 
 package org.apache.flink.core.plugin;
 
-import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
 
-import java.io.IOException;
+import java.io.File;
 import java.nio.file.Path;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.Optional;
 
 /**
- * Utility functions for the plugin mechanism.
+ * Stores the configuration for plugins mechanism.
  */
-public final class PluginUtils {
+public class PluginConfig {
+       private final Optional<Path> pluginsPath;
 
-       private PluginUtils() {
-               throw new AssertionError("Singleton class.");
+       private PluginConfig() {
+               this.pluginsPath = Optional.empty();
        }
 
-       public static PluginManager 
createPluginManagerFromRootFolder(Optional<Path> pluginsRootPath) {
-               Collection<PluginDescriptor> pluginDescriptorsForDirectory;
-
-               if (pluginsRootPath.isPresent()) {
-                       try {
-                               pluginDescriptorsForDirectory =
-                                       new 
DirectoryBasedPluginFinder(pluginsRootPath.get()).findPlugins();
-                       } catch (IOException e) {
-                               throw new FlinkRuntimeException("Exception when 
trying to initialize plugin system.", e);
-                       }
-               } else {
-                       pluginDescriptorsForDirectory = Collections.emptyList();
+       private PluginConfig(Path pluginsPath) {
+               this.pluginsPath = Optional.of(pluginsPath);
+       }
+
+       public Optional<Path> getPluginsPath() {
+               return pluginsPath;
+       }
+
+       public static PluginConfig fromConfiguration(Configuration 
configuration) {
+               String pluginsDir = 
configuration.getString(ConfigConstants.ENV_FLINK_PLUGINS_DIR, null);
+               if (pluginsDir == null) {
+                       return new PluginConfig();
                }
 
-               return new PluginManager(pluginDescriptorsForDirectory);
+               File pluginsDirFile = new File(pluginsDir);
+               if (!pluginsDirFile.isDirectory()) {
+                       return new PluginConfig();
+               }
+               return new PluginConfig(pluginsDirFile.toPath());
        }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java 
b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java
index 17e2ae4..0f9f0d3 100644
--- a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java
@@ -18,13 +18,12 @@
 
 package org.apache.flink.core.plugin;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import java.io.IOException;
-import java.nio.file.Path;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Optional;
 
 /**
  * Utility functions for the plugin mechanism.
@@ -35,20 +34,22 @@ public final class PluginUtils {
                throw new AssertionError("Singleton class.");
        }
 
-       public static PluginManager 
createPluginManagerFromRootFolder(Optional<Path> pluginsRootPath) {
-               Collection<PluginDescriptor> pluginDescriptorsForDirectory;
+       public static PluginManager 
createPluginManagerFromRootFolder(Configuration configuration) {
+               return 
createPluginManagerFromRootFolder(PluginConfig.fromConfiguration(configuration));
+       }
 
-               if (pluginsRootPath.isPresent()) {
+       private static PluginManager 
createPluginManagerFromRootFolder(PluginConfig pluginConfig) {
+               if (pluginConfig.getPluginsPath().isPresent()) {
                        try {
-                               pluginDescriptorsForDirectory =
-                                       new 
DirectoryBasedPluginFinder(pluginsRootPath.get()).findPlugins();
+                               Collection<PluginDescriptor> pluginDescriptors =
+                                       new 
DirectoryBasedPluginFinder(pluginConfig.getPluginsPath().get()).findPlugins();
+                               return new PluginManager(pluginDescriptors);
                        } catch (IOException e) {
                                throw new FlinkRuntimeException("Exception when 
trying to initialize plugin system.", e);
                        }
-               } else {
-                       pluginDescriptorsForDirectory = Collections.emptyList();
                }
-
-               return new PluginManager(pluginDescriptorsForDirectory);
+               else {
+                       return new PluginManager(Collections.emptyList());
+               }
        }
 }
diff --git 
a/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
 
b/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
index a42564d..94bb872 100644
--- 
a/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
@@ -97,7 +97,7 @@ public class GlobalConfigurationTest extends TestLogger {
 
        @Test(expected = IllegalArgumentException.class)
        public void testFailIfNull() {
-               GlobalConfiguration.loadConfiguration(null);
+               GlobalConfiguration.loadConfiguration((String) null);
        }
 
        @Test(expected = IllegalConfigurationException.class)
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
index e2d0161..4c0ec97 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
@@ -43,7 +43,6 @@ import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.UndeclaredThrowableException;
 import java.util.Map;
-import java.util.Optional;
 
 /**
  * The entry point for running a TaskManager in a Mesos container.
@@ -87,8 +86,7 @@ public class MesosTaskExecutorRunner {
                final Map<String, String> envs = System.getenv();
 
                // configure the filesystems
-               //TODO provide plugin path.
-               FileSystem.initialize(configuration, 
PluginUtils.createPluginManagerFromRootFolder(Optional.empty()));
+               FileSystem.initialize(configuration, 
PluginUtils.createPluginManagerFromRootFolder(configuration));
 
                // tell akka to die in case of an error
                configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, 
true);
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index 12a5b56..e907836 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -58,7 +58,6 @@ import java.nio.file.Files;
 import java.time.ZonedDateTime;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
@@ -111,8 +110,7 @@ public class HistoryServer {
                LOG.info("Loading configuration from {}", configDir);
                final Configuration flinkConfig = 
GlobalConfiguration.loadConfiguration(configDir);
 
-               //TODO provide plugin path.
-               FileSystem.initialize(flinkConfig, 
PluginUtils.createPluginManagerFromRootFolder(Optional.empty()));
+               FileSystem.initialize(flinkConfig, 
PluginUtils.createPluginManagerFromRootFolder(flinkConfig));
 
                // run the history server
                SecurityUtils.install(new SecurityConfiguration(flinkConfig));
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index d6dc724..0538bec 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -77,7 +77,6 @@ import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
@@ -187,8 +186,7 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
 
        private void configureFileSystems(Configuration configuration) {
                LOG.info("Install default filesystem.");
-               //TODO provide plugin path
-               FileSystem.initialize(configuration, 
PluginUtils.createPluginManagerFromRootFolder(Optional.empty()));
+               FileSystem.initialize(configuration, 
PluginUtils.createPluginManagerFromRootFolder(configuration));
        }
 
        protected SecurityContext installSecurityContext(Configuration 
configuration) throws Exception {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index d5985b1..2b4e0f2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -70,7 +70,6 @@ import java.lang.reflect.UndeclaredThrowableException;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Optional;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -285,8 +284,7 @@ public class TaskManagerRunner implements 
FatalErrorHandler, AutoCloseableAsync
 
                final Configuration configuration = loadConfiguration(args);
 
-               //TODO provide plugin path.
-               FileSystem.initialize(configuration, 
PluginUtils.createPluginManagerFromRootFolder(Optional.empty()));
+               FileSystem.initialize(configuration, 
PluginUtils.createPluginManagerFromRootFolder(configuration));
 
                SecurityUtils.install(new SecurityConfiguration(configuration));
 
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 158c261..99939bd 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -69,7 +69,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 
 /**
  * Executor that performs the Flink communication locally. The calls are 
blocking depending on the
@@ -113,8 +112,7 @@ public class LocalExecutor implements Executor {
                        this.flinkConfig = 
GlobalConfiguration.loadConfiguration(flinkConfigDir);
 
                        // initialize default file system
-                       //TODO provide plugin path.
-                       FileSystem.initialize(flinkConfig, 
PluginUtils.createPluginManagerFromRootFolder(Optional.empty()));
+                       FileSystem.initialize(flinkConfig, 
PluginUtils.createPluginManagerFromRootFolder(flinkConfig));
 
                        // load command lines for deployment
                        this.commandLines = 
CliFrontend.loadCustomCommandLines(flinkConfig, flinkConfigDir);
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index ebf470c..8b51968 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -97,7 +97,6 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -660,8 +659,9 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
 
                // ------------------ Initialize the file systems 
-------------------------
 
-               //TODO provide plugin path.
-               org.apache.flink.core.fs.FileSystem.initialize(configuration, 
PluginUtils.createPluginManagerFromRootFolder(Optional.empty()));
+               org.apache.flink.core.fs.FileSystem.initialize(
+                       configuration,
+                       
PluginUtils.createPluginManagerFromRootFolder(configuration));
 
                // initialize file system
                // Copy the application master jar to the filesystem
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
index d1a3700..bcb87e1 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
@@ -46,7 +46,6 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.Callable;
 
 /**
@@ -95,8 +94,7 @@ public class YarnTaskExecutorRunner {
 
                        final Configuration configuration = 
GlobalConfiguration.loadConfiguration(currDir);
 
-                       //TODO provide path.
-                       FileSystem.initialize(configuration, 
PluginUtils.createPluginManagerFromRootFolder(Optional.empty()));
+                       FileSystem.initialize(configuration, 
PluginUtils.createPluginManagerFromRootFolder(configuration));
 
                        
setupConfigurationAndInstallSecurityContext(configuration, currDir, ENV);
 

Reply via email to