Repository: flink Updated Branches: refs/heads/master 96590ffaf -> 0483ba583
[FLINK-3675][yarn] improvements to library shipping - always ship the lib folder - properly setup the classpath from the supplied ship files - cleanup deploy() method of YarnClusterDescriptor - add test case This closes #2187 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0483ba58 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0483ba58 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0483ba58 Branch: refs/heads/master Commit: 0483ba583c7790d13b8035c2916318a2b58c67d6 Parents: 0e8be41 Author: Maximilian Michels <[email protected]> Authored: Mon Jun 27 15:13:56 2016 +0200 Committer: Maximilian Michels <[email protected]> Committed: Fri Jul 1 15:22:31 2016 +0200 ---------------------------------------------------------------------- .../org/apache/flink/client/CliFrontend.java | 11 +- .../flink/configuration/ConfigConstants.java | 9 + flink-dist/src/main/flink-bin/bin/config.sh | 5 + .../src/main/flink-bin/yarn-bin/yarn-session.sh | 2 +- ...CliFrontendYarnAddressConfigurationTest.java | 2 +- .../flink/yarn/FlinkYarnSessionCliTest.java | 8 +- .../yarn/TestingYarnClusterDescriptor.java | 2 +- .../flink/yarn/YARNHighAvailabilityITCase.java | 4 +- .../flink/yarn/YARNSessionFIFOITCase.java | 5 +- .../flink/yarn/YarnClusterDescriptorTest.java | 114 +++++++ .../org/apache/flink/yarn/YarnTestBase.java | 5 +- .../yarn/AbstractYarnClusterDescriptor.java | 312 +++++++++++-------- .../main/java/org/apache/flink/yarn/Utils.java | 17 +- .../flink/yarn/YarnApplicationMasterRunner.java | 9 +- .../org/apache/flink/yarn/YarnConfigKeys.java | 2 + .../flink/yarn/cli/FlinkYarnSessionCli.java | 11 +- pom.xml | 2 +- 17 files changed, 352 insertions(+), 168 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index 1322f23..404cc21 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -113,7 +113,6 @@ public class CliFrontend { private static final String ACTION_SAVEPOINT = "savepoint"; // config dir parameters - public static final String ENV_CONFIG_DIRECTORY = "FLINK_CONF_DIR"; private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf"; private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf"; @@ -153,7 +152,7 @@ public class CliFrontend { // load the configuration LOG.info("Trying to load configuration file"); GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath()); - System.setProperty(ENV_CONFIG_DIRECTORY, configDirectory.getAbsolutePath()); + System.setProperty(ConfigConstants.ENV_FLINK_CONF_DIR, configDirectory.getAbsolutePath()); this.config = GlobalConfiguration.getConfiguration(); try { @@ -1022,8 +1021,8 @@ public class CliFrontend { // -------------------------------------------------------------------------------------------- public static String getConfigurationDirectoryFromEnv() { - String envLocation = System.getenv(ENV_CONFIG_DIRECTORY); - String location = envLocation != null ? envLocation : System.getProperty(ENV_CONFIG_DIRECTORY); + String envLocation = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR); + String location = envLocation != null ? envLocation : System.getProperty(ConfigConstants.ENV_FLINK_CONF_DIR); if (location != null) { if (new File(location).exists()) { @@ -1031,7 +1030,7 @@ public class CliFrontend { } else { throw new RuntimeException("The config directory '" + location + "', specified in the '" + - ENV_CONFIG_DIRECTORY + "' environment variable, does not exist."); + ConfigConstants.ENV_FLINK_CONF_DIR + "' environment variable, does not exist."); } } else if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) { @@ -1043,7 +1042,7 @@ public class CliFrontend { else { throw new RuntimeException("The configuration directory was not specified. " + "Please specify the directory containing the configuration file through the '" + - ENV_CONFIG_DIRECTORY + "' environment variable."); + ConfigConstants.ENV_FLINK_CONF_DIR + "' environment variable."); } return location; } http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index a7fc274..548acb7 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -950,6 +950,15 @@ public final class ConfigConstants { /** ZooKeeper default leader port. */ public static final int DEFAULT_ZOOKEEPER_LEADER_PORT = 3888; + // ----------------------------- Environment Variables ---------------------------- + + /** The environment variable name which contains the location of the configuration directory */ + public static final String ENV_FLINK_CONF_DIR = "FLINK_CONF_DIR"; + + /** The environment variable name which contains the location of the lib folder */ + public static final String ENV_FLINK_LIB_DIR = "FLINK_LIB_DIR"; + + /** * Not instantiable. */ http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/flink-dist/src/main/flink-bin/bin/config.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh index e57e6f2..9ffa713 100755 --- a/flink-dist/src/main/flink-bin/bin/config.sh +++ b/flink-dist/src/main/flink-bin/bin/config.sh @@ -135,6 +135,11 @@ SYMLINK_RESOLVED_BIN=`cd "$bin"; pwd -P` FLINK_ROOT_DIR=`dirname "$SYMLINK_RESOLVED_BIN"` FLINK_LIB_DIR=$FLINK_ROOT_DIR/lib +### Exported environment variables ### +export FLINK_CONF_DIR +# export /lib dir to access it during deployment of the Yarn staging files +export FLINK_LIB_DIR + # These need to be mangled because they are directly passed to java. # The above lib path is used by the shell script to retrieve jars in a # directory, so it needs to be unmangled. http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh index 16f8ab9..7c92680 100755 --- a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh +++ b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh @@ -52,5 +52,5 @@ log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4 export FLINK_CONF_DIR -$JAVA_RUN $JVM_ARGS -classpath $CC_CLASSPATH:$HADOOP_CLASSPATH:$HADOOP_CONF_DIR:$YARN_CONF_DIR $log_setting org.apache.flink.yarn.cli.FlinkYarnSessionCli -ship $bin/../lib/ -j $FLINK_LIB_DIR/flink-dist*.jar "$@" +$JAVA_RUN $JVM_ARGS -classpath $CC_CLASSPATH:$HADOOP_CLASSPATH:$HADOOP_CONF_DIR:$YARN_CONF_DIR $log_setting org.apache.flink.yarn.cli.FlinkYarnSessionCli -j $FLINK_LIB_DIR/flink-dist*.jar "$@" http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java index 323c10b..a99c835 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java @@ -86,7 +86,7 @@ public class CliFrontendYarnAddressConfigurationTest { // Unset FLINK_CONF_DIR, as this is a precondition for this test to work properly Map<String, String> map = new HashMap<>(System.getenv()); - map.remove(CliFrontend.ENV_CONFIG_DIRECTORY); + map.remove(ConfigConstants.ENV_FLINK_CONF_DIR); TestBaseUtils.setEnv(map); } http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java index ba249c2..12a6180 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java @@ -21,9 +21,9 @@ package org.apache.flink.yarn; import akka.actor.ActorSystem; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Options; -import org.apache.commons.cli.PosixParser; - +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.client.CliFrontend; import org.apache.flink.client.cli.CliFrontendParser; import org.apache.flink.client.cli.RunOptions; @@ -61,14 +61,14 @@ public class FlinkYarnSessionCliTest { File tmpFolder = tmp.newFolder(); File fakeConf = new File(tmpFolder, "flink-conf.yaml"); fakeConf.createNewFile(); - map.put("FLINK_CONF_DIR", tmpFolder.getAbsolutePath()); + map.put(ConfigConstants.ENV_FLINK_CONF_DIR, tmpFolder.getAbsolutePath()); TestBaseUtils.setEnv(map); FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "", false); Options options = new Options(); cli.addGeneralOptions(options); cli.addRunOptions(options); - CommandLineParser parser = new PosixParser(); + CommandLineParser parser = new DefaultParser(); CommandLine cmd = null; try { cmd = parser.parse(options, new String[]{"run", "-j", "fake.jar", "-n", "15", "-D", "akka.ask.timeout=5 min"}); http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java index 386f48f..3ed0dc1 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java @@ -46,7 +46,7 @@ public class TestingYarnClusterDescriptor extends AbstractYarnClusterDescriptor filesToShip.add(testingJar); filesToShip.add(testingRuntimeJar); - setShipFiles(filesToShip); + addShipFiles(filesToShip); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java index 1b7db26..75445e1 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java @@ -103,9 +103,9 @@ public class YARNHighAvailabilityITCase extends YarnTestBase { flinkYarnClient.setJobManagerMemory(768); flinkYarnClient.setTaskManagerMemory(1024); flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath())); - flinkYarnClient.setShipFiles(Arrays.asList(flinkLibFolder.listFiles())); + flinkYarnClient.addShipFiles(Arrays.asList(flinkLibFolder.listFiles())); - String confDirPath = System.getenv("FLINK_CONF_DIR"); + String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR); flinkYarnClient.setConfigurationDirectory(confDirPath); String fsStateHandlePath = tmp.getRoot().getPath(); http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java index fe5400a..8a2ad60 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.yarn; import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.yarn.cli.FlinkYarnSessionCli; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; @@ -221,8 +222,8 @@ public class YARNSessionFIFOITCase extends YarnTestBase { flinkYarnClient.setJobManagerMemory(768); flinkYarnClient.setTaskManagerMemory(1024); flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath())); - flinkYarnClient.setShipFiles(Arrays.asList(flinkLibFolder.listFiles())); - String confDirPath = System.getenv("FLINK_CONF_DIR"); + flinkYarnClient.addShipFiles(Arrays.asList(flinkLibFolder.listFiles())); + String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR); flinkYarnClient.setConfigurationDirectory(confDirPath); flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.getConfiguration()); flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml")); http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java new file mode 100644 index 0000000..5cf3ddc --- /dev/null +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.yarn; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.hadoop.fs.Path; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class YarnClusterDescriptorTest { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + /** + * Tests to ship a lib folder through the {@code YarnClusterDescriptor.addShipFiles} + */ + @Test + public void testExplicitLibShipping() throws Exception { + AbstractYarnClusterDescriptor descriptor = new YarnClusterDescriptor(); + descriptor.setLocalJarPath(new Path("/path/to/flink.jar")); + + descriptor.setConfigurationDirectory(temporaryFolder.getRoot().getAbsolutePath()); + descriptor.setConfigurationFilePath(new Path(temporaryFolder.getRoot().getPath())); + descriptor.setFlinkConfiguration(new Configuration()); + + File libFile = temporaryFolder.newFile("libFile.jar"); + File libFolder = temporaryFolder.newFolder().getAbsoluteFile(); + + Assert.assertFalse(descriptor.shipFiles.contains(libFile)); + Assert.assertFalse(descriptor.shipFiles.contains(libFolder)); + + List<File> shipFiles = new ArrayList<>(); + shipFiles.add(libFile); + shipFiles.add(libFolder); + + descriptor.addShipFiles(shipFiles); + + Assert.assertTrue(descriptor.shipFiles.contains(libFile)); + Assert.assertTrue(descriptor.shipFiles.contains(libFolder)); + + // only execute part of the deployment to test for shipped files + Set<File> effectiveShipFiles = new HashSet<>(); + descriptor.addLibFolderToShipFiles(effectiveShipFiles); + + Assert.assertEquals(0, effectiveShipFiles.size()); + Assert.assertEquals(2, descriptor.shipFiles.size()); + Assert.assertTrue(descriptor.shipFiles.contains(libFile)); + Assert.assertTrue(descriptor.shipFiles.contains(libFolder)); + } + + /** + * Tests to ship a lib folder through the {@code ConfigConstants.ENV_FLINK_LIB_DIR} + */ + @Test + public void testEnvironmentLibShipping() throws Exception { + AbstractYarnClusterDescriptor descriptor = new YarnClusterDescriptor(); + + descriptor.setConfigurationDirectory(temporaryFolder.getRoot().getAbsolutePath()); + descriptor.setConfigurationFilePath(new Path(temporaryFolder.getRoot().getPath())); + descriptor.setFlinkConfiguration(new Configuration()); + + File libFolder = temporaryFolder.newFolder().getAbsoluteFile(); + File libFile = new File(libFolder, "libFile.jar"); + libFile.createNewFile(); + + Set<File> effectiveShipFiles = new HashSet<>(); + + final Map<String, String> oldEnv = System.getenv(); + try { + Map<String, String> env = new HashMap<>(1); + env.put(ConfigConstants.ENV_FLINK_LIB_DIR, libFolder.getAbsolutePath()); + TestBaseUtils.setEnv(env); + // only execute part of the deployment to test for shipped files + descriptor.addLibFolderToShipFiles(effectiveShipFiles); + } finally { + TestBaseUtils.setEnv(oldEnv); + } + + // only add the ship the folder, not the contents + Assert.assertFalse(effectiveShipFiles.contains(libFile)); + Assert.assertTrue(effectiveShipFiles.contains(libFolder)); + Assert.assertFalse(descriptor.shipFiles.contains(libFile)); + Assert.assertFalse(descriptor.shipFiles.contains(libFolder)); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java index 8ad5bcb..76b5d31 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -20,6 +20,7 @@ package org.apache.flink.yarn; import org.apache.commons.io.FileUtils; import org.apache.flink.client.CliFrontend; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.yarn.cli.FlinkYarnSessionCli; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.util.TestLogger; @@ -365,7 +366,7 @@ public abstract class YarnTestBase extends TestLogger { File flinkConfDirPath = findFile(flinkDistRootDir, new ContainsName(new String[]{"flink-conf.yaml"})); Assert.assertNotNull(flinkConfDirPath); - map.put(CliFrontend.ENV_CONFIG_DIRECTORY, flinkConfDirPath.getParent()); + map.put(ConfigConstants.ENV_FLINK_CONF_DIR, flinkConfDirPath.getParent()); File yarnConfFile = writeYarnSiteConfigXML(conf); map.put("YARN_CONF_DIR", yarnConfFile.getParentFile().getAbsolutePath()); @@ -596,7 +597,7 @@ public abstract class YarnTestBase extends TestLogger { public static void teardown() throws Exception { // Unset FLINK_CONF_DIR, as it might change the behavior of other tests Map<String, String> map = new HashMap<>(System.getenv()); - map.remove(CliFrontend.ENV_CONFIG_DIRECTORY); + map.remove(ConfigConstants.ENV_FLINK_CONF_DIR); TestBaseUtils.setEnv(map); // When we are on travis, we copy the tmp files of JUnit (containing the MiniYARNCluster log files) http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index 81690c4..08a55cd 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -67,9 +67,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; +import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR; import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME; import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME; import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.getDynamicProperties; @@ -107,19 +111,19 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor private int taskManagerCount = 1; - private String yarnQueue = null; + private String yarnQueue; private String configurationDirectory; private Path flinkConfigurationPath; - private Path flinkLoggingConfigurationPath; // optional - private Path flinkJarPath; private String dynamicPropertiesEncoded; - private List<File> shipFiles = new ArrayList<>(); + /** Lazily initialized list of files to ship */ + protected List<File> shipFiles = new LinkedList<>(); + private org.apache.flink.configuration.Configuration flinkConfiguration; private boolean detached; @@ -137,34 +141,19 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor } } - // load the config - this.configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv(); - GlobalConfiguration.loadConfiguration(configurationDirectory); - this.flinkConfiguration = GlobalConfiguration.getConfiguration(); - - File confFile = new File(configurationDirectory + File.separator + CONFIG_FILE_NAME); - if (!confFile.exists()) { - throw new RuntimeException("Unable to locate configuration file in " + confFile); - } - flinkConfigurationPath = new Path(confFile.getAbsolutePath()); + // tries to load the config through the environment, if it fails it can still be set through the setters + try { + this.configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv(); + GlobalConfiguration.loadConfiguration(configurationDirectory); + this.flinkConfiguration = GlobalConfiguration.getConfiguration(); - //check if there is a logback or log4j file - if (configurationDirectory.length() > 0) { - File logback = new File(configurationDirectory + File.pathSeparator + CONFIG_FILE_LOGBACK_NAME); - if (logback.exists()) { - shipFiles.add(logback); - flinkLoggingConfigurationPath = new Path(logback.toURI()); - } - File log4j = new File(configurationDirectory + File.pathSeparator + CONFIG_FILE_LOG4J_NAME); - if (log4j.exists()) { - shipFiles.add(log4j); - if (flinkLoggingConfigurationPath != null) { - // this means there is already a logback configuration file --> fail - LOG.warn("The configuration directory ('" + configurationDirectory + "') contains both LOG4J and " + - "Logback configuration files. Please delete or rename one of them."); - } - flinkLoggingConfigurationPath = new Path(log4j.toURI()); + File confFile = new File(configurationDirectory + File.separator + CONFIG_FILE_NAME); + if (!confFile.exists()) { + throw new RuntimeException("Unable to locate configuration file in " + confFile); } + flinkConfigurationPath = new Path(confFile.getAbsolutePath()); + } catch (Exception e) { + LOG.debug("Config couldn't be loaded from environment variable."); } } @@ -227,14 +216,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor this.configurationDirectory = configurationDirectory; } - public void setFlinkLoggingConfigurationPath(Path logConfPath) { - flinkLoggingConfigurationPath = logConfPath; - } - - public Path getFlinkLoggingConfigurationPath() { - return flinkLoggingConfigurationPath; - } - public void setTaskManagerCount(int tmCount) { if(tmCount < 1) { throw new IllegalArgumentException("The TaskManager count has to be at least 1."); @@ -246,8 +227,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor return this.taskManagerCount; } - public void setShipFiles(List<File> shipFiles) { - for(File shipFile: shipFiles) { + public void addShipFiles(List<File> shipFiles) { + for (File shipFile: shipFiles) { // remove uberjar from ship list (by default everything in the lib/ folder is added to // the list of files to ship, but we handle the uberjar separately. if(!(shipFile.getName().startsWith("flink-dist") && shipFile.getName().endsWith("jar"))) { @@ -432,26 +413,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor LOG.info("\tJobManager memory = {}", jobManagerMemoryMb); LOG.info("\tTaskManager memory = {}", taskManagerMemoryMb); - // Create application via yarnClient final YarnClient yarnClient = getYarnClient(); - final YarnClientApplication yarnApplication = yarnClient.createApplication(); - GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse(); - // ------------------ Add dynamic properties to local flinkConfiguraton ------ - Map<String, String> dynProperties = getDynamicProperties(dynamicPropertiesEncoded); - for (Map.Entry<String, String> dynProperty : dynProperties.entrySet()) { - flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue()); - } - - // ------------------ Set default file system scheme ------------------------- - - try { - org.apache.flink.core.fs.FileSystem.setDefaultScheme(flinkConfiguration); - } catch (IOException e) { - throw new IOException("Error while setting the default " + - "filesystem scheme from configuration.", e); - } // ------------------ Check if the specified queue exists -------------------- try { @@ -482,6 +446,35 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor } } + // ------------------ Add dynamic properties to local flinkConfiguraton ------ + + Map<String, String> dynProperties = getDynamicProperties(dynamicPropertiesEncoded); + for (Map.Entry<String, String> dynProperty : dynProperties.entrySet()) { + flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue()); + } + + // ------------------ Set default file system scheme ------------------------- + + try { + org.apache.flink.core.fs.FileSystem.setDefaultScheme(flinkConfiguration); + } catch (IOException e) { + throw new IOException("Error while setting the default " + + "filesystem scheme from configuration.", e); + } + + // initialize file system + // Copy the application master jar to the filesystem + // Create a local resource to point to the destination jar path + final FileSystem fs = FileSystem.get(conf); + + // hard coded check for the GoogleHDFS client because its not overriding the getScheme() method. + if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") && + fs.getScheme().startsWith("file")) { + LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the " + + "specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values." + + "The Flink YARN client needs to store its files in a distributed file system"); + } + // ------------------ Check if the YARN ClusterClient has the requested resources -------------- // the yarnMinAllocationMB specifies the smallest possible container allocation size. @@ -502,6 +495,10 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor taskManagerMemoryMb = yarnMinAllocationMB; } + // Create application via yarnClient + final YarnClientApplication yarnApplication = yarnClient.createApplication(); + GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse(); + Resource maxRes = appResponse.getMaximumResourceCapability(); final String NOTE = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n"; if(jobManagerMemoryMb > maxRes.getMemory() ) { @@ -556,62 +553,32 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor } } - // ------------------ Prepare Application Master Container ------------------------------ - - // respect custom JVM options in the YAML file - final String javaOpts = flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, ""); - - String logbackFile = configurationDirectory + File.separator + CONFIG_FILE_LOGBACK_NAME; - boolean hasLogback = new File(logbackFile).exists(); - String log4jFile = configurationDirectory + File.separator + CONFIG_FILE_LOG4J_NAME; - - boolean hasLog4j = new File(log4jFile).exists(); - if(hasLogback) { - shipFiles.add(new File(logbackFile)); - } - if(hasLog4j) { - shipFiles.add(new File(log4jFile)); + Set<File> effectiveShipFiles = new HashSet<>(shipFiles.size()); + for (File file : shipFiles) { + effectiveShipFiles.add(file.getAbsoluteFile()); } - // Set up the container launch context for the application master - ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); - - String amCommand = "$JAVA_HOME/bin/java" - + " -Xmx" + Utils.calculateHeapSize(jobManagerMemoryMb, flinkConfiguration) - + "M " + javaOpts; - - if(hasLogback || hasLog4j) { - amCommand += " -Dlog.file=\"" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.log\""; - - if(hasLogback) { - amCommand += " -Dlogback.configurationFile=file:" + CONFIG_FILE_LOGBACK_NAME; - } + //check if there is a logback or log4j file + File logbackFile = new File(configurationDirectory + File.separator + CONFIG_FILE_LOGBACK_NAME); + final boolean hasLogback = logbackFile.exists(); + if (hasLogback) { + effectiveShipFiles.add(logbackFile); + } - if(hasLog4j) { - amCommand += " -Dlog4j.configuration=file:" + CONFIG_FILE_LOG4J_NAME; + File log4jFile = new File(configurationDirectory + File.separator + CONFIG_FILE_LOG4J_NAME); + final boolean hasLog4j = log4jFile.exists(); + if (hasLog4j) { + effectiveShipFiles.add(log4jFile); + if (hasLogback) { + // this means there is already a logback configuration file --> fail + LOG.warn("The configuration directory ('" + configurationDirectory + "') contains both LOG4J and " + + "Logback configuration files. Please delete or rename one of them."); } } - amCommand += " " + getApplicationMasterClass().getName() + " " - + " 1>" - + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.out" - + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.err"; - amContainer.setCommands(Collections.singletonList(amCommand)); + addLibFolderToShipFiles(effectiveShipFiles); - LOG.debug("Application Master start command: " + amCommand); - - // intialize HDFS - // Copy the application master jar to the filesystem - // Create a local resource to point to the destination jar path - final FileSystem fs = FileSystem.get(conf); - - // hard coded check for the GoogleHDFS client because its not overriding the getScheme() method. - if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") && - fs.getScheme().startsWith("file")) { - LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the " - + "specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values." - + "The Flink YARN client needs to store its files in a distributed file system"); - } + final ContainerLaunchContext amContainer = setupApplicationMasterContainer(hasLogback, hasLog4j); // Set-up ApplicationSubmissionContext for the application ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext(); @@ -634,52 +601,74 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor final ApplicationId appId = appContext.getApplicationId(); - // Setup jar for ApplicationMaster - LocalResource appMasterJar = Records.newRecord(LocalResource.class); - LocalResource flinkConf = Records.newRecord(LocalResource.class); - Path remotePathJar = Utils.setupLocalResource(fs, appId.toString(), flinkJarPath, appMasterJar, fs.getHomeDirectory()); - Path remotePathConf = Utils.setupLocalResource(fs, appId.toString(), flinkConfigurationPath, flinkConf, fs.getHomeDirectory()); - Map<String, LocalResource> localResources = new HashMap<>(2); - localResources.put("flink.jar", appMasterJar); - localResources.put("flink-conf.yaml", flinkConf); - - - // setup security tokens (code from apache storm) - final Path[] paths = new Path[2 + shipFiles.size()]; + // local resource map for Yarn + final Map<String, LocalResource> localResources = new HashMap<>(2 + effectiveShipFiles.size()); + // list of remote paths (after upload) + final List<Path> paths = new ArrayList<>(2 + effectiveShipFiles.size()); + // classpath assembler + final StringBuilder classPathBuilder = new StringBuilder(); + // ship list that enables reuse of resources for task manager containers StringBuilder envShipFileList = new StringBuilder(); - // upload ship files - for (int i = 0; i < shipFiles.size(); i++) { - File shipFile = shipFiles.get(i); + + // upload and register ship files + for (File shipFile : effectiveShipFiles) { LocalResource shipResources = Records.newRecord(LocalResource.class); + Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath()); - paths[2 + i] = Utils.setupLocalResource(fs, appId.toString(), - shipLocalPath, shipResources, fs.getHomeDirectory()); + Path remotePath = + Utils.setupLocalResource(fs, appId.toString(), shipLocalPath, shipResources, fs.getHomeDirectory()); + + paths.add(remotePath); + localResources.put(shipFile.getName(), shipResources); - envShipFileList.append(paths[2 + i]); - if(i+1 < shipFiles.size()) { - envShipFileList.append(','); + classPathBuilder.append(ApplicationConstants.Environment.PWD.$()); + classPathBuilder.append(File.separator); + classPathBuilder.append(shipFile.getName()); + if (shipFile.isDirectory()) { + // add directories to the classpath + classPathBuilder.append(File.separator).append("*"); } + classPathBuilder.append(File.pathSeparator); + + envShipFileList.append(remotePath).append(","); } - paths[0] = remotePathJar; - paths[1] = remotePathConf; + // Setup jar for ApplicationMaster + LocalResource appMasterJar = Records.newRecord(LocalResource.class); + LocalResource flinkConf = Records.newRecord(LocalResource.class); + Path remotePathJar = + Utils.setupLocalResource(fs, appId.toString(), flinkJarPath, appMasterJar, fs.getHomeDirectory()); + Path remotePathConf = + Utils.setupLocalResource(fs, appId.toString(), flinkConfigurationPath, flinkConf, fs.getHomeDirectory()); + localResources.put("flink.jar", appMasterJar); + localResources.put("flink-conf.yaml", flinkConf); + + paths.add(remotePathJar); + classPathBuilder.append(ApplicationConstants.Environment.PWD.$()).append(File.separator) + .append("flink.jar").append(File.pathSeparator); + paths.add(remotePathConf); + classPathBuilder.append(ApplicationConstants.Environment.PWD.$()).append(File.separator) + .append("flink-conf.yaml").append(File.pathSeparator); + sessionFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + appId.toString() + "/"); FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE); fs.setPermission(sessionFilesDir, permission); // set permission for path. + // setup security tokens Utils.setTokensFor(amContainer, paths, conf); amContainer.setLocalResources(localResources); fs.close(); - // Setup CLASSPATH for ApplicationMaster - Map<String, String> appMasterEnv = new HashMap<>(); + // Setup CLASSPATH and environment variables for ApplicationMaster + final Map<String, String> appMasterEnv = new HashMap<>(); // set user specified app master environment variables appMasterEnv.putAll(Utils.getEnvironmentVariables(ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX, flinkConfiguration)); - // set classpath from YARN configuration - Utils.setupEnv(conf, appMasterEnv); + // set Flink app class path + appMasterEnv.put(YarnConfigKeys.ENV_FLINK_CLASSPATH, classPathBuilder.toString()); + // set Flink on YARN internal configuration values appMasterEnv.put(YarnConfigKeys.ENV_TM_COUNT, String.valueOf(taskManagerCount)); appMasterEnv.put(YarnConfigKeys.ENV_TM_MEMORY, String.valueOf(taskManagerMemoryMb)); @@ -695,6 +684,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor appMasterEnv.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded); } + // set classpath from YARN configuration + Utils.setupYarnClassPath(conf, appMasterEnv); + amContainer.setEnvironment(appMasterEnv); // Set up resource type requirements for ApplicationMaster @@ -778,7 +770,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor String host = report.getHost(); int port = report.getRpcPort(); - String trackingURL = report.getTrackingUrl(); // Correctly initialize the Flink config flinkConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host); @@ -1015,6 +1006,61 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor } } + protected void addLibFolderToShipFiles(Set<File> effectiveShipFiles) { + // Add lib folder to the ship files if the environment variable is set. + // This is for convenience when running from the command-line. + // (for other files users explicitly set the ship files) + String libDir = System.getenv().get(ENV_FLINK_LIB_DIR); + if (libDir != null) { + File libDirFile = new File(libDir); + if (libDirFile.isDirectory()) { + effectiveShipFiles.add(libDirFile); + } else { + throw new YarnDeploymentException("The environment variable '" + ENV_FLINK_LIB_DIR + + "' is set to '" + libDir + "' but the directory doesn't exist."); + } + } else if (this.shipFiles.isEmpty()) { + LOG.warn("Environment variable '{}' not set and ship files have not been provided manually. " + + "Not shipping any library files.", ENV_FLINK_LIB_DIR); + } + } + + protected ContainerLaunchContext setupApplicationMasterContainer(boolean hasLogback, boolean hasLog4j) { + // ------------------ Prepare Application Master Container ------------------------------ + + // respect custom JVM options in the YAML file + final String javaOpts = flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, ""); + + // Set up the container launch context for the application master + ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); + + String amCommand = "$JAVA_HOME/bin/java" + + " -Xmx" + Utils.calculateHeapSize(jobManagerMemoryMb, flinkConfiguration) + + "M " + javaOpts; + + if (hasLogback || hasLog4j) { + amCommand += " -Dlog.file=\"" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.log\""; + + if(hasLogback) { + amCommand += " -Dlogback.configurationFile=file:" + CONFIG_FILE_LOGBACK_NAME; + } + + if(hasLog4j) { + amCommand += " -Dlog4j.configuration=file:" + CONFIG_FILE_LOG4J_NAME; + } + } + + amCommand += " " + getApplicationMasterClass().getName() + " " + + " 1>" + + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.out" + + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.err"; + amContainer.setCommands(Collections.singletonList(amCommand)); + + LOG.debug("Application Master start command: " + amCommand); + + return amContainer; + } + /** * Creates a YarnClusterClient; may be overriden in tests */ http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java index 4e8f747..f56c024 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java @@ -24,6 +24,7 @@ import java.lang.reflect.InvocationTargetException; import java.nio.ByteBuffer; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.flink.runtime.clusterframework.BootstrapTools; @@ -94,9 +95,15 @@ public final class Utils { } - public static void setupEnv(Configuration conf, Map<String, String> appMasterEnv) { - addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), Environment.PWD.$() + File.separator + "*"); - for (String c: conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { + public static void setupYarnClassPath(Configuration conf, Map<String, String> appMasterEnv) { + addToEnvironment( + appMasterEnv, + Environment.CLASSPATH.name(), + appMasterEnv.get(YarnConfigKeys.ENV_FLINK_CLASSPATH)); + String[] applicationClassPathEntries = conf.getStrings( + YarnConfiguration.YARN_APPLICATION_CLASSPATH, + YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH); + for (String c : applicationClassPathEntries) { addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), c.trim()); } } @@ -133,10 +140,10 @@ public final class Utils { localResource.setVisibility(LocalResourceVisibility.APPLICATION); } - public static void setTokensFor(ContainerLaunchContext amContainer, Path[] paths, Configuration conf) throws IOException { + public static void setTokensFor(ContainerLaunchContext amContainer, List<Path> paths, Configuration conf) throws IOException { Credentials credentials = new Credentials(); // for HDFS - TokenCache.obtainTokensForNamenodes(credentials, paths, conf); + TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf); // for HBase obtainTokenForHBase(credentials, conf); // for user http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index 24b5a35..563425f 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -66,6 +66,8 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; + /** * This class is the executable entry point for the YARN application master. * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager} @@ -517,6 +519,9 @@ public class YarnApplicationMasterRunner { String yarnClientUsername = env.get(YarnConfigKeys.ENV_CLIENT_USERNAME); require(yarnClientUsername != null, "Environment variable %s not set", YarnConfigKeys.ENV_CLIENT_USERNAME); + String classPathString = env.get(YarnConfigKeys.ENV_FLINK_CLASSPATH); + require(classPathString != null, "Environment variable %s not set", YarnConfigKeys.ENV_FLINK_CLASSPATH); + // obtain a handle to the file system used by YARN final org.apache.hadoop.fs.FileSystem yarnFileSystem; try { @@ -582,7 +587,9 @@ public class YarnApplicationMasterRunner { containerEnv.putAll(tmParams.taskManagerEnv()); // add YARN classpath, etc to the container environment - Utils.setupEnv(yarnConfig, containerEnv); + containerEnv.put(ENV_FLINK_CLASSPATH, classPathString); + Utils.setupYarnClassPath(yarnConfig, containerEnv); + containerEnv.put(YarnConfigKeys.ENV_CLIENT_USERNAME, yarnClientUsername); ctx.setEnvironment(containerEnv); http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java index 7dc86a5..1a02712 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java @@ -37,6 +37,8 @@ public class YarnConfigKeys { public static final String ENV_DETACHED = "_DETACHED"; public static final String ENV_DYNAMIC_PROPERTIES = "_DYNAMIC_PROPERTIES"; + public static final String ENV_FLINK_CLASSPATH = "_FLINK_CLASSPATH"; + public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the Flink jar resource location (in HDFS). http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index a0225a7..126f0f1 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -43,14 +43,12 @@ import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; -import java.io.FilenameFilter; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -269,18 +267,13 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> String shipPath = cmd.getOptionValue(SHIP_PATH.getOpt()); File shipDir = new File(shipPath); if (shipDir.isDirectory()) { - shipFiles = new ArrayList<>(Arrays.asList(shipDir.listFiles(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return !(name.equals(".") || name.equals("..")); - } - }))); + shipFiles.add(shipDir); } else { LOG.warn("Ship directory is not a directory. Ignoring it."); } } - yarnClusterDescriptor.setShipFiles(shipFiles); + yarnClusterDescriptor.addShipFiles(shipFiles); // queue if (cmd.hasOption(QUEUE.getOpt())) { http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 9da3fa5..c3d4ea8 100644 --- a/pom.xml +++ b/pom.xml @@ -505,7 +505,7 @@ under the License. <profile> <!-- used for aggregating ScalaDoc with JavaDoc --> <id>aggregate-scaladoc</id> - <dependencies> + <dependencies> <dependency> <!-- This is necessary for building the java docs using Java 8. Otherwise the javadoc
