This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b5dc213ea7a0bdbf021a10cd54ec452003bb5173 Author: Piotr Nowojski <[email protected]> AuthorDate: Wed Apr 24 18:23:56 2019 +0200 [FLINK-12143] Distribute FLINK_PLUGINS_DIR the same way as FLINK_LIB_DIR is distributed --- flink-container/docker/Dockerfile | 1 + .../flink/configuration/ConfigConstants.java | 3 +++ flink-dist/src/main/flink-bin/bin/config.sh | 2 ++ flink-dist/src/main/flink-bin/bin/flink.bat | 1 + .../src/main/flink-bin/bin/start-cluster.bat | 1 + .../overlays/FlinkDistributionOverlay.java | 12 +++++++--- .../overlays/FlinkDistributionOverlayTest.java | 25 ++++++++++++++----- .../flink/yarn/AbstractYarnClusterDescriptor.java | 28 +++++++++++++++------- .../flink/yarn/YarnClusterDescriptorTest.java | 22 ++++++++++------- 9 files changed, 69 insertions(+), 26 deletions(-) diff --git a/flink-container/docker/Dockerfile b/flink-container/docker/Dockerfile index 853ac36..129b5c5 100644 --- a/flink-container/docker/Dockerfile +++ b/flink-container/docker/Dockerfile @@ -25,6 +25,7 @@ RUN apk add --no-cache bash snappy libc6-compat ENV FLINK_INSTALL_PATH=/opt ENV FLINK_HOME $FLINK_INSTALL_PATH/flink ENV FLINK_LIB_DIR $FLINK_HOME/lib +ENV FLINK_PLUGINS_DIR $FLINK_HOME/plugins ENV FLINK_OPT_DIR $FLINK_HOME/opt ENV FLINK_JOB_ARTIFACTS_DIR $FLINK_INSTALL_PATH/artifacts ENV PATH $PATH:$FLINK_HOME/bin diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index a4de6cc..082ec56 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -2011,6 +2011,9 @@ public final class ConfigConstants { /** The environment variable name which contains the location of the opt directory. */ public static final String ENV_FLINK_OPT_DIR = "FLINK_OPT_DIR"; + /** The environment variable name which contains the location of the plugins folder. */ + public static final String ENV_FLINK_PLUGINS_DIR = "FLINK_PLUGINS_DIR"; + /** The environment variable name which contains the location of the bin directory. */ public static final String ENV_FLINK_BIN_DIR = "FLINK_BIN_DIR"; diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh index e430e91..79fa6ad 100755 --- a/flink-dist/src/main/flink-bin/bin/config.sh +++ b/flink-dist/src/main/flink-bin/bin/config.sh @@ -298,6 +298,7 @@ SYMLINK_RESOLVED_BIN=`cd "$bin"; pwd -P` # Define the main directory of the flink installation FLINK_HOME=`dirname "$SYMLINK_RESOLVED_BIN"` FLINK_LIB_DIR=$FLINK_HOME/lib +FLINK_PLUGINS_DIR=$FLINK_HOME/plugins FLINK_OPT_DIR=$FLINK_HOME/opt @@ -314,6 +315,7 @@ YAML_CONF=${FLINK_CONF_DIR}/${FLINK_CONF_FILE} ### Exported environment variables ### export FLINK_CONF_DIR export FLINK_BIN_DIR +export FLINK_PLUGINS_DIR # export /lib dir to access it during deployment of the Yarn staging files export FLINK_LIB_DIR # export /opt dir to access it for the SQL client diff --git a/flink-dist/src/main/flink-bin/bin/flink.bat b/flink-dist/src/main/flink-bin/bin/flink.bat index 8ed1fcf..a88a4db 100644 --- a/flink-dist/src/main/flink-bin/bin/flink.bat +++ b/flink-dist/src/main/flink-bin/bin/flink.bat @@ -22,6 +22,7 @@ setlocal SET bin=%~dp0 SET FLINK_HOME=%bin%.. SET FLINK_LIB_DIR=%FLINK_HOME%\lib +SET FLINK_PLUGINS_DIR=%FLINK_HOME%\plugins SET JVM_ARGS=-Xmx512m diff --git a/flink-dist/src/main/flink-bin/bin/start-cluster.bat b/flink-dist/src/main/flink-bin/bin/start-cluster.bat index a852a55..9c01cb2 100644 --- a/flink-dist/src/main/flink-bin/bin/start-cluster.bat +++ b/flink-dist/src/main/flink-bin/bin/start-cluster.bat @@ -22,6 +22,7 @@ setlocal EnableDelayedExpansion SET bin=%~dp0 SET FLINK_HOME=%bin%.. SET FLINK_LIB_DIR=%FLINK_HOME%\lib +SET FLINK_PLUGINS_DIR=%FLINK_HOME%\plugins SET FLINK_CONF_DIR=%FLINK_HOME%\conf SET FLINK_LOG_DIR=%FLINK_HOME%\log diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlay.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlay.java index baebd8c..a430e0c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlay.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlay.java @@ -21,17 +21,18 @@ package org.apache.flink.runtime.clusterframework.overlays; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.clusterframework.ContainerSpecification; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; -import java.util.Map; import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_BIN_DIR; import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR; import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_HOME_DIR; import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR; +import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_PLUGINS_DIR; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -56,11 +57,13 @@ public class FlinkDistributionOverlay extends AbstractContainerOverlay { final File flinkBinPath; final File flinkConfPath; final File flinkLibPath; + final File flinkPluginsPath; - public FlinkDistributionOverlay(File flinkBinPath, File flinkConfPath, File flinkLibPath) { + public FlinkDistributionOverlay(File flinkBinPath, File flinkConfPath, File flinkLibPath, File flinkPluginsPath) { this.flinkBinPath = checkNotNull(flinkBinPath); this.flinkConfPath = checkNotNull(flinkConfPath); this.flinkLibPath = checkNotNull(flinkLibPath); + this.flinkPluginsPath = checkNotNull(flinkPluginsPath); } @Override @@ -72,6 +75,7 @@ public class FlinkDistributionOverlay extends AbstractContainerOverlay { addPathRecursively(flinkBinPath, TARGET_ROOT, container); addPathRecursively(flinkConfPath, TARGET_ROOT, container); addPathRecursively(flinkLibPath, TARGET_ROOT, container); + addPathRecursively(flinkPluginsPath, TARGET_ROOT, container); } public static Builder newBuilder() { @@ -85,6 +89,7 @@ public class FlinkDistributionOverlay extends AbstractContainerOverlay { File flinkBinPath; File flinkConfPath; File flinkLibPath; + File flinkPluginsPath; /** * Configures the overlay using the current environment. @@ -97,12 +102,13 @@ public class FlinkDistributionOverlay extends AbstractContainerOverlay { flinkBinPath = getObligatoryFileFromEnvironment(ENV_FLINK_BIN_DIR); flinkConfPath = getObligatoryFileFromEnvironment(ENV_FLINK_CONF_DIR); flinkLibPath = getObligatoryFileFromEnvironment(ENV_FLINK_LIB_DIR); + flinkPluginsPath = getObligatoryFileFromEnvironment(ENV_FLINK_PLUGINS_DIR); return this; } public FlinkDistributionOverlay build() { - return new FlinkDistributionOverlay(flinkBinPath, flinkConfPath, flinkLibPath); + return new FlinkDistributionOverlay(flinkBinPath, flinkConfPath, flinkLibPath, flinkPluginsPath); } private static File getObligatoryFileFromEnvironment(String envVariableName) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlayTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlayTest.java index e77dd3a..ef7a5e3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlayTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlayTest.java @@ -30,6 +30,7 @@ import java.io.File; import java.util.HashMap; import java.util.Map; +import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_PLUGINS_DIR; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -49,6 +50,7 @@ public class FlinkDistributionOverlayTest extends ContainerOverlayTestBase { File binFolder = tempFolder.newFolder("bin"); File libFolder = tempFolder.newFolder("lib"); + File pluginsFolder = tempFolder.newFolder("plugins"); File confFolder = tempFolder.newFolder("conf"); Path[] files = createPaths( @@ -58,14 +60,17 @@ public class FlinkDistributionOverlayTest extends ContainerOverlayTestBase { "lib/foo.jar", "lib/A/foo.jar", "lib/B/foo.jar", - "lib/B/bar.jar"); + "lib/B/bar.jar", + "plugins/P1/plugin1a.jar", + "plugins/P1/plugin1b.jar", + "plugins/P2/plugin2.jar"); ContainerSpecification containerSpecification = new ContainerSpecification(); FlinkDistributionOverlay overlay = new FlinkDistributionOverlay( binFolder, confFolder, - libFolder - ); + libFolder, + pluginsFolder); overlay.configure(containerSpecification); for(Path file : files) { @@ -79,12 +84,14 @@ public class FlinkDistributionOverlayTest extends ContainerOverlayTestBase { File binFolder = tempFolder.newFolder("bin"); File libFolder = tempFolder.newFolder("lib"); + File pluginsFolder = tempFolder.newFolder("plugins"); File confFolder = tempFolder.newFolder("conf"); // adjust the test environment for the purposes of this test Map<String, String> map = new HashMap<String, String>(System.getenv()); map.put(ENV_FLINK_BIN_DIR, binFolder.getAbsolutePath()); map.put(ENV_FLINK_LIB_DIR, libFolder.getAbsolutePath()); + map.put(ENV_FLINK_PLUGINS_DIR, pluginsFolder.getAbsolutePath()); map.put(ENV_FLINK_CONF_DIR, confFolder.getAbsolutePath()); CommonTestUtils.setEnv(map); @@ -92,18 +99,24 @@ public class FlinkDistributionOverlayTest extends ContainerOverlayTestBase { assertEquals(binFolder.getAbsolutePath(), builder.flinkBinPath.getAbsolutePath()); assertEquals(libFolder.getAbsolutePath(), builder.flinkLibPath.getAbsolutePath()); + assertEquals(pluginsFolder.getAbsolutePath(), builder.flinkPluginsPath.getAbsolutePath()); assertEquals(confFolder.getAbsolutePath(), builder.flinkConfPath.getAbsolutePath()); } @Test public void testBuilderFromEnvironmentBad() throws Exception { + testBuilderFromEnvironmentBad(ENV_FLINK_BIN_DIR); + testBuilderFromEnvironmentBad(ENV_FLINK_LIB_DIR); + testBuilderFromEnvironmentBad(ENV_FLINK_PLUGINS_DIR); + testBuilderFromEnvironmentBad(ENV_FLINK_CONF_DIR); + } + + public void testBuilderFromEnvironmentBad(String obligatoryEnvironmentVariable) throws Exception { Configuration conf = new Configuration(); // adjust the test environment for the purposes of this test Map<String, String> map = new HashMap<>(System.getenv()); - map.remove(ENV_FLINK_BIN_DIR); - map.remove(ENV_FLINK_LIB_DIR); - map.remove(ENV_FLINK_CONF_DIR); + map.remove(obligatoryEnvironmentVariable); CommonTestUtils.setEnv(map); try { diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index 3135ecf..ebf470c 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -102,6 +102,7 @@ import java.util.Set; import java.util.stream.Collectors; import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR; +import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_PLUGINS_DIR; import static org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH; import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME; import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME; @@ -700,7 +701,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor } } - addLibFolderToShipFiles(systemShipFiles); + addEnvironmentFoldersToShipFiles(systemShipFiles); // Set-up ApplicationSubmissionContext for the application @@ -1513,23 +1514,32 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor } } - protected void addLibFolderToShipFiles(Collection<File> effectiveShipFiles) { + protected void addEnvironmentFoldersToShipFiles(Collection<File> effectiveShipFiles) { // Add lib folder to the ship files if the environment variable is set. // This is for convenience when running from the command-line. // (for other files users explicitly set the ship files) String libDir = System.getenv().get(ENV_FLINK_LIB_DIR); if (libDir != null) { - File libDirFile = new File(libDir); - if (libDirFile.isDirectory()) { - effectiveShipFiles.add(libDirFile); - } else { - throw new YarnDeploymentException("The environment variable '" + ENV_FLINK_LIB_DIR + - "' is set to '" + libDir + "' but the directory doesn't exist."); - } + addEnvFolderToShipFiles(effectiveShipFiles, libDir, ENV_FLINK_LIB_DIR); } else if (this.shipFiles.isEmpty()) { LOG.warn("Environment variable '{}' not set and ship files have not been provided manually. " + "Not shipping any library files.", ENV_FLINK_LIB_DIR); } + + String pluginsDir = System.getenv().get(ENV_FLINK_PLUGINS_DIR); + if (pluginsDir != null) { + addEnvFolderToShipFiles(effectiveShipFiles, pluginsDir, ENV_FLINK_PLUGINS_DIR); + } + } + + private void addEnvFolderToShipFiles(Collection<File> effectiveShipFiles, String directory, String environmentVariableName) { + File directoryFile = new File(directory); + if (directoryFile.isDirectory()) { + effectiveShipFiles.add(directoryFile); + } else { + throw new YarnDeploymentException("The environment variable '" + environmentVariableName + + "' is set to '" + directory + "' but the directory doesn't exist."); + } } protected ContainerLaunchContext setupApplicationMasterContainer( diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java index 6c2e42e..1d1d44f 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java @@ -435,10 +435,10 @@ public class YarnClusterDescriptorTest extends TestLogger { } /** - * Tests to ship a lib folder through the {@code YarnClusterDescriptor.addShipFiles}. + * Tests to ship files through the {@code YarnClusterDescriptor.addShipFiles}. */ @Test - public void testExplicitLibShipping() throws Exception { + public void testExplicitFileShipping() throws Exception { try (YarnClusterDescriptor descriptor = new YarnClusterDescriptor( new Configuration(), yarnConfiguration, @@ -464,7 +464,7 @@ public class YarnClusterDescriptorTest extends TestLogger { // only execute part of the deployment to test for shipped files Set<File> effectiveShipFiles = new HashSet<>(); - descriptor.addLibFolderToShipFiles(effectiveShipFiles); + descriptor.addEnvironmentFoldersToShipFiles(effectiveShipFiles); Assert.assertEquals(0, effectiveShipFiles.size()); Assert.assertEquals(2, descriptor.shipFiles.size()); @@ -473,11 +473,17 @@ public class YarnClusterDescriptorTest extends TestLogger { } } - /** - * Tests to ship a lib folder through the {@code ConfigConstants.ENV_FLINK_LIB_DIR}. - */ @Test public void testEnvironmentLibShipping() throws Exception { + testEnvironmentDirectoryShipping(ConfigConstants.ENV_FLINK_LIB_DIR); + } + + @Test + public void testEnvironmentPluginsShipping() throws Exception { + testEnvironmentDirectoryShipping(ConfigConstants.ENV_FLINK_PLUGINS_DIR); + } + + public void testEnvironmentDirectoryShipping(String environmentVariable) throws Exception { try (YarnClusterDescriptor descriptor = new YarnClusterDescriptor( new Configuration(), yarnConfiguration, @@ -493,10 +499,10 @@ public class YarnClusterDescriptorTest extends TestLogger { final Map<String, String> oldEnv = System.getenv(); try { Map<String, String> env = new HashMap<>(1); - env.put(ConfigConstants.ENV_FLINK_LIB_DIR, libFolder.getAbsolutePath()); + env.put(environmentVariable, libFolder.getAbsolutePath()); CommonTestUtils.setEnv(env); // only execute part of the deployment to test for shipped files - descriptor.addLibFolderToShipFiles(effectiveShipFiles); + descriptor.addEnvironmentFoldersToShipFiles(effectiveShipFiles); } finally { CommonTestUtils.setEnv(oldEnv); }
