This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b5dc213ea7a0bdbf021a10cd54ec452003bb5173
Author: Piotr Nowojski <[email protected]>
AuthorDate: Wed Apr 24 18:23:56 2019 +0200

    [FLINK-12143] Distribute FLINK_PLUGINS_DIR the same way as FLINK_LIB_DIR is 
distributed
---
 flink-container/docker/Dockerfile                  |  1 +
 .../flink/configuration/ConfigConstants.java       |  3 +++
 flink-dist/src/main/flink-bin/bin/config.sh        |  2 ++
 flink-dist/src/main/flink-bin/bin/flink.bat        |  1 +
 .../src/main/flink-bin/bin/start-cluster.bat       |  1 +
 .../overlays/FlinkDistributionOverlay.java         | 12 +++++++---
 .../overlays/FlinkDistributionOverlayTest.java     | 25 ++++++++++++++-----
 .../flink/yarn/AbstractYarnClusterDescriptor.java  | 28 +++++++++++++++-------
 .../flink/yarn/YarnClusterDescriptorTest.java      | 22 ++++++++++-------
 9 files changed, 69 insertions(+), 26 deletions(-)

diff --git a/flink-container/docker/Dockerfile 
b/flink-container/docker/Dockerfile
index 853ac36..129b5c5 100644
--- a/flink-container/docker/Dockerfile
+++ b/flink-container/docker/Dockerfile
@@ -25,6 +25,7 @@ RUN apk add --no-cache bash snappy libc6-compat
 ENV FLINK_INSTALL_PATH=/opt
 ENV FLINK_HOME $FLINK_INSTALL_PATH/flink
 ENV FLINK_LIB_DIR $FLINK_HOME/lib
+ENV FLINK_PLUGINS_DIR $FLINK_HOME/plugins
 ENV FLINK_OPT_DIR $FLINK_HOME/opt
 ENV FLINK_JOB_ARTIFACTS_DIR $FLINK_INSTALL_PATH/artifacts
 ENV PATH $PATH:$FLINK_HOME/bin
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index a4de6cc..082ec56 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -2011,6 +2011,9 @@ public final class ConfigConstants {
        /** The environment variable name which contains the location of the 
opt directory. */
        public static final String ENV_FLINK_OPT_DIR = "FLINK_OPT_DIR";
 
+       /** The environment variable name which contains the location of the 
plugins folder. */
+       public static final String ENV_FLINK_PLUGINS_DIR = "FLINK_PLUGINS_DIR";
+
        /** The environment variable name which contains the location of the 
bin directory. */
        public static final String ENV_FLINK_BIN_DIR = "FLINK_BIN_DIR";
 
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh 
b/flink-dist/src/main/flink-bin/bin/config.sh
index e430e91..79fa6ad 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -298,6 +298,7 @@ SYMLINK_RESOLVED_BIN=`cd "$bin"; pwd -P`
 # Define the main directory of the flink installation
 FLINK_HOME=`dirname "$SYMLINK_RESOLVED_BIN"`
 FLINK_LIB_DIR=$FLINK_HOME/lib
+FLINK_PLUGINS_DIR=$FLINK_HOME/plugins
 FLINK_OPT_DIR=$FLINK_HOME/opt
 
 
@@ -314,6 +315,7 @@ YAML_CONF=${FLINK_CONF_DIR}/${FLINK_CONF_FILE}
 ### Exported environment variables ###
 export FLINK_CONF_DIR
 export FLINK_BIN_DIR
+export FLINK_PLUGINS_DIR
 # export /lib dir to access it during deployment of the Yarn staging files
 export FLINK_LIB_DIR
 # export /opt dir to access it for the SQL client
diff --git a/flink-dist/src/main/flink-bin/bin/flink.bat 
b/flink-dist/src/main/flink-bin/bin/flink.bat
index 8ed1fcf..a88a4db 100644
--- a/flink-dist/src/main/flink-bin/bin/flink.bat
+++ b/flink-dist/src/main/flink-bin/bin/flink.bat
@@ -22,6 +22,7 @@ setlocal
 SET bin=%~dp0
 SET FLINK_HOME=%bin%..
 SET FLINK_LIB_DIR=%FLINK_HOME%\lib
+SET FLINK_PLUGINS_DIR=%FLINK_HOME%\plugins
 
 SET JVM_ARGS=-Xmx512m
 
diff --git a/flink-dist/src/main/flink-bin/bin/start-cluster.bat 
b/flink-dist/src/main/flink-bin/bin/start-cluster.bat
index a852a55..9c01cb2 100644
--- a/flink-dist/src/main/flink-bin/bin/start-cluster.bat
+++ b/flink-dist/src/main/flink-bin/bin/start-cluster.bat
@@ -22,6 +22,7 @@ setlocal EnableDelayedExpansion
 SET bin=%~dp0
 SET FLINK_HOME=%bin%..
 SET FLINK_LIB_DIR=%FLINK_HOME%\lib
+SET FLINK_PLUGINS_DIR=%FLINK_HOME%\plugins
 SET FLINK_CONF_DIR=%FLINK_HOME%\conf
 SET FLINK_LOG_DIR=%FLINK_HOME%\log
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlay.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlay.java
index baebd8c..a430e0c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlay.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlay.java
@@ -21,17 +21,18 @@ package org.apache.flink.runtime.clusterframework.overlays;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Map;
 
 import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_BIN_DIR;
 import static 
org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR;
 import static 
org.apache.flink.configuration.ConfigConstants.ENV_FLINK_HOME_DIR;
 import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR;
+import static 
org.apache.flink.configuration.ConfigConstants.ENV_FLINK_PLUGINS_DIR;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -56,11 +57,13 @@ public class FlinkDistributionOverlay extends 
AbstractContainerOverlay {
        final File flinkBinPath;
        final File flinkConfPath;
        final File flinkLibPath;
+       final File flinkPluginsPath;
 
-       public FlinkDistributionOverlay(File flinkBinPath, File flinkConfPath, 
File flinkLibPath) {
+       public FlinkDistributionOverlay(File flinkBinPath, File flinkConfPath, 
File flinkLibPath, File flinkPluginsPath) {
                this.flinkBinPath = checkNotNull(flinkBinPath);
                this.flinkConfPath = checkNotNull(flinkConfPath);
                this.flinkLibPath = checkNotNull(flinkLibPath);
+               this.flinkPluginsPath = checkNotNull(flinkPluginsPath);
        }
 
        @Override
@@ -72,6 +75,7 @@ public class FlinkDistributionOverlay extends 
AbstractContainerOverlay {
                addPathRecursively(flinkBinPath, TARGET_ROOT, container);
                addPathRecursively(flinkConfPath, TARGET_ROOT, container);
                addPathRecursively(flinkLibPath, TARGET_ROOT, container);
+               addPathRecursively(flinkPluginsPath, TARGET_ROOT, container);
        }
 
        public static Builder newBuilder() {
@@ -85,6 +89,7 @@ public class FlinkDistributionOverlay extends 
AbstractContainerOverlay {
                File flinkBinPath;
                File flinkConfPath;
                File flinkLibPath;
+               File flinkPluginsPath;
 
                /**
                 * Configures the overlay using the current environment.
@@ -97,12 +102,13 @@ public class FlinkDistributionOverlay extends 
AbstractContainerOverlay {
                        flinkBinPath = 
getObligatoryFileFromEnvironment(ENV_FLINK_BIN_DIR);
                        flinkConfPath = 
getObligatoryFileFromEnvironment(ENV_FLINK_CONF_DIR);
                        flinkLibPath = 
getObligatoryFileFromEnvironment(ENV_FLINK_LIB_DIR);
+                       flinkPluginsPath = 
getObligatoryFileFromEnvironment(ENV_FLINK_PLUGINS_DIR);
 
                        return this;
                }
 
                public FlinkDistributionOverlay build() {
-                       return new FlinkDistributionOverlay(flinkBinPath, 
flinkConfPath, flinkLibPath);
+                       return new FlinkDistributionOverlay(flinkBinPath, 
flinkConfPath, flinkLibPath, flinkPluginsPath);
                }
 
                private static File getObligatoryFileFromEnvironment(String 
envVariableName) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlayTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlayTest.java
index e77dd3a..ef7a5e3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlayTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlayTest.java
@@ -30,6 +30,7 @@ import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
 
+import static 
org.apache.flink.configuration.ConfigConstants.ENV_FLINK_PLUGINS_DIR;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
@@ -49,6 +50,7 @@ public class FlinkDistributionOverlayTest extends 
ContainerOverlayTestBase {
 
                File binFolder = tempFolder.newFolder("bin");
                File libFolder = tempFolder.newFolder("lib");
+               File pluginsFolder = tempFolder.newFolder("plugins");
                File confFolder = tempFolder.newFolder("conf");
 
                Path[] files = createPaths(
@@ -58,14 +60,17 @@ public class FlinkDistributionOverlayTest extends 
ContainerOverlayTestBase {
                        "lib/foo.jar",
                        "lib/A/foo.jar",
                        "lib/B/foo.jar",
-                       "lib/B/bar.jar");
+                       "lib/B/bar.jar",
+                       "plugins/P1/plugin1a.jar",
+                       "plugins/P1/plugin1b.jar",
+                       "plugins/P2/plugin2.jar");
 
                ContainerSpecification containerSpecification = new 
ContainerSpecification();
                FlinkDistributionOverlay overlay = new FlinkDistributionOverlay(
                        binFolder,
                        confFolder,
-                       libFolder
-               );
+                       libFolder,
+                       pluginsFolder);
                overlay.configure(containerSpecification);
 
                for(Path file : files) {
@@ -79,12 +84,14 @@ public class FlinkDistributionOverlayTest extends 
ContainerOverlayTestBase {
 
                File binFolder = tempFolder.newFolder("bin");
                File libFolder = tempFolder.newFolder("lib");
+               File pluginsFolder = tempFolder.newFolder("plugins");
                File confFolder = tempFolder.newFolder("conf");
 
                // adjust the test environment for the purposes of this test
                Map<String, String> map = new HashMap<String, 
String>(System.getenv());
                map.put(ENV_FLINK_BIN_DIR, binFolder.getAbsolutePath());
                map.put(ENV_FLINK_LIB_DIR, libFolder.getAbsolutePath());
+               map.put(ENV_FLINK_PLUGINS_DIR, pluginsFolder.getAbsolutePath());
                map.put(ENV_FLINK_CONF_DIR, confFolder.getAbsolutePath());
                CommonTestUtils.setEnv(map);
 
@@ -92,18 +99,24 @@ public class FlinkDistributionOverlayTest extends 
ContainerOverlayTestBase {
 
                assertEquals(binFolder.getAbsolutePath(), 
builder.flinkBinPath.getAbsolutePath());
                assertEquals(libFolder.getAbsolutePath(), 
builder.flinkLibPath.getAbsolutePath());
+               assertEquals(pluginsFolder.getAbsolutePath(), 
builder.flinkPluginsPath.getAbsolutePath());
                assertEquals(confFolder.getAbsolutePath(), 
builder.flinkConfPath.getAbsolutePath());
        }
 
        @Test
        public void testBuilderFromEnvironmentBad() throws Exception {
+               testBuilderFromEnvironmentBad(ENV_FLINK_BIN_DIR);
+               testBuilderFromEnvironmentBad(ENV_FLINK_LIB_DIR);
+               testBuilderFromEnvironmentBad(ENV_FLINK_PLUGINS_DIR);
+               testBuilderFromEnvironmentBad(ENV_FLINK_CONF_DIR);
+       }
+
+       public void testBuilderFromEnvironmentBad(String 
obligatoryEnvironmentVariable) throws Exception {
                Configuration conf = new Configuration();
 
                // adjust the test environment for the purposes of this test
                Map<String, String> map = new HashMap<>(System.getenv());
-               map.remove(ENV_FLINK_BIN_DIR);
-               map.remove(ENV_FLINK_LIB_DIR);
-               map.remove(ENV_FLINK_CONF_DIR);
+               map.remove(obligatoryEnvironmentVariable);
                CommonTestUtils.setEnv(map);
 
                try {
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 3135ecf..ebf470c 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -102,6 +102,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR;
+import static 
org.apache.flink.configuration.ConfigConstants.ENV_FLINK_PLUGINS_DIR;
 import static 
org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH;
 import static 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME;
 import static 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME;
@@ -700,7 +701,7 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                        }
                }
 
-               addLibFolderToShipFiles(systemShipFiles);
+               addEnvironmentFoldersToShipFiles(systemShipFiles);
 
                // Set-up ApplicationSubmissionContext for the application
 
@@ -1513,23 +1514,32 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                }
        }
 
-       protected void addLibFolderToShipFiles(Collection<File> 
effectiveShipFiles) {
+       protected void addEnvironmentFoldersToShipFiles(Collection<File> 
effectiveShipFiles) {
                // Add lib folder to the ship files if the environment variable 
is set.
                // This is for convenience when running from the command-line.
                // (for other files users explicitly set the ship files)
                String libDir = System.getenv().get(ENV_FLINK_LIB_DIR);
                if (libDir != null) {
-                       File libDirFile = new File(libDir);
-                       if (libDirFile.isDirectory()) {
-                               effectiveShipFiles.add(libDirFile);
-                       } else {
-                               throw new YarnDeploymentException("The 
environment variable '" + ENV_FLINK_LIB_DIR +
-                                       "' is set to '" + libDir + "' but the 
directory doesn't exist.");
-                       }
+                       addEnvFolderToShipFiles(effectiveShipFiles, libDir, 
ENV_FLINK_LIB_DIR);
                } else if (this.shipFiles.isEmpty()) {
                        LOG.warn("Environment variable '{}' not set and ship 
files have not been provided manually. " +
                                "Not shipping any library files.", 
ENV_FLINK_LIB_DIR);
                }
+
+               String pluginsDir = System.getenv().get(ENV_FLINK_PLUGINS_DIR);
+               if (pluginsDir != null) {
+                       addEnvFolderToShipFiles(effectiveShipFiles, pluginsDir, 
ENV_FLINK_PLUGINS_DIR);
+               }
+       }
+
+       private void addEnvFolderToShipFiles(Collection<File> 
effectiveShipFiles, String directory, String environmentVariableName) {
+               File directoryFile = new File(directory);
+               if (directoryFile.isDirectory()) {
+                       effectiveShipFiles.add(directoryFile);
+               } else {
+                       throw new YarnDeploymentException("The environment 
variable '" + environmentVariableName +
+                               "' is set to '" + directory + "' but the 
directory doesn't exist.");
+               }
        }
 
        protected ContainerLaunchContext setupApplicationMasterContainer(
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index 6c2e42e..1d1d44f 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -435,10 +435,10 @@ public class YarnClusterDescriptorTest extends TestLogger 
{
        }
 
        /**
-        * Tests to ship a lib folder through the {@code 
YarnClusterDescriptor.addShipFiles}.
+        * Tests to ship files through the {@code 
YarnClusterDescriptor.addShipFiles}.
         */
        @Test
-       public void testExplicitLibShipping() throws Exception {
+       public void testExplicitFileShipping() throws Exception {
                try (YarnClusterDescriptor descriptor = new 
YarnClusterDescriptor(
                        new Configuration(),
                        yarnConfiguration,
@@ -464,7 +464,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 
                        // only execute part of the deployment to test for 
shipped files
                        Set<File> effectiveShipFiles = new HashSet<>();
-                       descriptor.addLibFolderToShipFiles(effectiveShipFiles);
+                       
descriptor.addEnvironmentFoldersToShipFiles(effectiveShipFiles);
 
                        Assert.assertEquals(0, effectiveShipFiles.size());
                        Assert.assertEquals(2, descriptor.shipFiles.size());
@@ -473,11 +473,17 @@ public class YarnClusterDescriptorTest extends TestLogger 
{
                }
        }
 
-       /**
-        * Tests to ship a lib folder through the {@code 
ConfigConstants.ENV_FLINK_LIB_DIR}.
-        */
        @Test
        public void testEnvironmentLibShipping() throws Exception {
+               
testEnvironmentDirectoryShipping(ConfigConstants.ENV_FLINK_LIB_DIR);
+       }
+
+       @Test
+       public void testEnvironmentPluginsShipping() throws Exception {
+               
testEnvironmentDirectoryShipping(ConfigConstants.ENV_FLINK_PLUGINS_DIR);
+       }
+
+       public void testEnvironmentDirectoryShipping(String 
environmentVariable) throws Exception {
                try (YarnClusterDescriptor descriptor = new 
YarnClusterDescriptor(
                        new Configuration(),
                        yarnConfiguration,
@@ -493,10 +499,10 @@ public class YarnClusterDescriptorTest extends TestLogger 
{
                        final Map<String, String> oldEnv = System.getenv();
                        try {
                                Map<String, String> env = new HashMap<>(1);
-                               env.put(ConfigConstants.ENV_FLINK_LIB_DIR, 
libFolder.getAbsolutePath());
+                               env.put(environmentVariable, 
libFolder.getAbsolutePath());
                                CommonTestUtils.setEnv(env);
                                // only execute part of the deployment to test 
for shipped files
-                               
descriptor.addLibFolderToShipFiles(effectiveShipFiles);
+                               
descriptor.addEnvironmentFoldersToShipFiles(effectiveShipFiles);
                        } finally {
                                CommonTestUtils.setEnv(oldEnv);
                        }

Reply via email to