Repository: flink
Updated Branches:
  refs/heads/master 96590ffaf -> 0483ba583


[FLINK-3675][yarn] improvements to library shipping

- always ship the lib folder
- properly setup the classpath from the supplied ship files
- cleanup deploy() method of YarnClusterDescriptor
- add test case

This closes #2187


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0483ba58
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0483ba58
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0483ba58

Branch: refs/heads/master
Commit: 0483ba583c7790d13b8035c2916318a2b58c67d6
Parents: 0e8be41
Author: Maximilian Michels <[email protected]>
Authored: Mon Jun 27 15:13:56 2016 +0200
Committer: Maximilian Michels <[email protected]>
Committed: Fri Jul 1 15:22:31 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |  11 +-
 .../flink/configuration/ConfigConstants.java    |   9 +
 flink-dist/src/main/flink-bin/bin/config.sh     |   5 +
 .../src/main/flink-bin/yarn-bin/yarn-session.sh |   2 +-
 ...CliFrontendYarnAddressConfigurationTest.java |   2 +-
 .../flink/yarn/FlinkYarnSessionCliTest.java     |   8 +-
 .../yarn/TestingYarnClusterDescriptor.java      |   2 +-
 .../flink/yarn/YARNHighAvailabilityITCase.java  |   4 +-
 .../flink/yarn/YARNSessionFIFOITCase.java       |   5 +-
 .../flink/yarn/YarnClusterDescriptorTest.java   | 114 +++++++
 .../org/apache/flink/yarn/YarnTestBase.java     |   5 +-
 .../yarn/AbstractYarnClusterDescriptor.java     | 312 +++++++++++--------
 .../main/java/org/apache/flink/yarn/Utils.java  |  17 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |   9 +-
 .../org/apache/flink/yarn/YarnConfigKeys.java   |   2 +
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |  11 +-
 pom.xml                                         |   2 +-
 17 files changed, 352 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 1322f23..404cc21 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -113,7 +113,6 @@ public class CliFrontend {
        private static final String ACTION_SAVEPOINT = "savepoint";
 
        // config dir parameters
-       public static final String ENV_CONFIG_DIRECTORY = "FLINK_CONF_DIR";
        private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
        private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";
 
@@ -153,7 +152,7 @@ public class CliFrontend {
                // load the configuration
                LOG.info("Trying to load configuration file");
                
GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
-               System.setProperty(ENV_CONFIG_DIRECTORY, 
configDirectory.getAbsolutePath());
+               System.setProperty(ConfigConstants.ENV_FLINK_CONF_DIR, 
configDirectory.getAbsolutePath());
                this.config = GlobalConfiguration.getConfiguration();
 
                try {
@@ -1022,8 +1021,8 @@ public class CliFrontend {
        // 
--------------------------------------------------------------------------------------------
 
        public static String getConfigurationDirectoryFromEnv() {
-               String envLocation = System.getenv(ENV_CONFIG_DIRECTORY);
-               String location = envLocation != null ? envLocation : 
System.getProperty(ENV_CONFIG_DIRECTORY);
+               String envLocation = 
System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
+               String location = envLocation != null ? envLocation : 
System.getProperty(ConfigConstants.ENV_FLINK_CONF_DIR);
 
                if (location != null) {
                        if (new File(location).exists()) {
@@ -1031,7 +1030,7 @@ public class CliFrontend {
                        }
                        else {
                                throw new RuntimeException("The config 
directory '" + location + "', specified in the '" +
-                                               ENV_CONFIG_DIRECTORY + "' 
environment variable, does not exist.");
+                                       ConfigConstants.ENV_FLINK_CONF_DIR + "' 
environment variable, does not exist.");
                        }
                }
                else if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) {
@@ -1043,7 +1042,7 @@ public class CliFrontend {
                else {
                        throw new RuntimeException("The configuration directory 
was not specified. " +
                                        "Please specify the directory 
containing the configuration file through the '" +
-                                       ENV_CONFIG_DIRECTORY + "' environment 
variable.");
+                               ConfigConstants.ENV_FLINK_CONF_DIR + "' 
environment variable.");
                }
                return location;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index a7fc274..548acb7 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -950,6 +950,15 @@ public final class ConfigConstants {
        /** ZooKeeper default leader port. */
        public static final int DEFAULT_ZOOKEEPER_LEADER_PORT = 3888;
 
+       // ----------------------------- Environment Variables 
----------------------------
+
+       /** The environment variable name which contains the location of the 
configuration directory */
+       public static final String ENV_FLINK_CONF_DIR = "FLINK_CONF_DIR";
+
+       /** The environment variable name which contains the location of the 
lib folder */
+       public static final String ENV_FLINK_LIB_DIR = "FLINK_LIB_DIR";
+
+
        /**
         * Not instantiable.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh 
b/flink-dist/src/main/flink-bin/bin/config.sh
index e57e6f2..9ffa713 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -135,6 +135,11 @@ SYMLINK_RESOLVED_BIN=`cd "$bin"; pwd -P`
 FLINK_ROOT_DIR=`dirname "$SYMLINK_RESOLVED_BIN"`
 FLINK_LIB_DIR=$FLINK_ROOT_DIR/lib
 
+### Exported environment variables ###
+export FLINK_CONF_DIR
+# export /lib dir to access it during deployment of the Yarn staging files
+export FLINK_LIB_DIR
+
 # These need to be mangled because they are directly passed to java.
 # The above lib path is used by the shell script to retrieve jars in a
 # directory, so it needs to be unmangled.

http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh 
b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
index 16f8ab9..7c92680 100755
--- a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
+++ b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
@@ -52,5 +52,5 @@ log_setting="-Dlog.file="$log" 
-Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4
 
 export FLINK_CONF_DIR
 
-$JAVA_RUN $JVM_ARGS -classpath 
$CC_CLASSPATH:$HADOOP_CLASSPATH:$HADOOP_CONF_DIR:$YARN_CONF_DIR  $log_setting 
org.apache.flink.yarn.cli.FlinkYarnSessionCli -ship $bin/../lib/ -j 
$FLINK_LIB_DIR/flink-dist*.jar "$@"
+$JAVA_RUN $JVM_ARGS -classpath 
$CC_CLASSPATH:$HADOOP_CLASSPATH:$HADOOP_CONF_DIR:$YARN_CONF_DIR  $log_setting 
org.apache.flink.yarn.cli.FlinkYarnSessionCli -j $FLINK_LIB_DIR/flink-dist*.jar 
"$@"
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index 323c10b..a99c835 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -86,7 +86,7 @@ public class CliFrontendYarnAddressConfigurationTest {
 
                // Unset FLINK_CONF_DIR, as this is a precondition for this 
test to work properly
                Map<String, String> map = new HashMap<>(System.getenv());
-               map.remove(CliFrontend.ENV_CONFIG_DIRECTORY);
+               map.remove(ConfigConstants.ENV_FLINK_CONF_DIR);
                TestBaseUtils.setEnv(map);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index ba249c2..12a6180 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -21,9 +21,9 @@ package org.apache.flink.yarn;
 import akka.actor.ActorSystem;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
-
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.RunOptions;
@@ -61,14 +61,14 @@ public class FlinkYarnSessionCliTest {
                File tmpFolder = tmp.newFolder();
                File fakeConf = new File(tmpFolder, "flink-conf.yaml");
                fakeConf.createNewFile();
-               map.put("FLINK_CONF_DIR", tmpFolder.getAbsolutePath());
+               map.put(ConfigConstants.ENV_FLINK_CONF_DIR, 
tmpFolder.getAbsolutePath());
                TestBaseUtils.setEnv(map);
                FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "", 
false);
                Options options = new Options();
                cli.addGeneralOptions(options);
                cli.addRunOptions(options);
 
-               CommandLineParser parser = new PosixParser();
+               CommandLineParser parser = new DefaultParser();
                CommandLine cmd = null;
                try {
                        cmd = parser.parse(options, new String[]{"run", "-j", 
"fake.jar", "-n", "15", "-D", "akka.ask.timeout=5 min"});

http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
index 386f48f..3ed0dc1 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
@@ -46,7 +46,7 @@ public class TestingYarnClusterDescriptor extends 
AbstractYarnClusterDescriptor
                filesToShip.add(testingJar);
                filesToShip.add(testingRuntimeJar);
 
-               setShipFiles(filesToShip);
+               addShipFiles(filesToShip);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 1b7db26..75445e1 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -103,9 +103,9 @@ public class YARNHighAvailabilityITCase extends 
YarnTestBase {
                flinkYarnClient.setJobManagerMemory(768);
                flinkYarnClient.setTaskManagerMemory(1024);
                flinkYarnClient.setLocalJarPath(new 
Path(flinkUberjar.getAbsolutePath()));
-               
flinkYarnClient.setShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
+               
flinkYarnClient.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
 
-               String confDirPath = System.getenv("FLINK_CONF_DIR");
+               String confDirPath = 
System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
                flinkYarnClient.setConfigurationDirectory(confDirPath);
 
                String fsStateHandlePath = tmp.getRoot().getPath();

http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index fe5400a..8a2ad60 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.yarn;
 
 import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.flink.configuration.GlobalConfiguration;
 import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
@@ -221,8 +222,8 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
                flinkYarnClient.setJobManagerMemory(768);
                flinkYarnClient.setTaskManagerMemory(1024);
                flinkYarnClient.setLocalJarPath(new 
Path(flinkUberjar.getAbsolutePath()));
-               
flinkYarnClient.setShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
-               String confDirPath = System.getenv("FLINK_CONF_DIR");
+               
flinkYarnClient.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
+               String confDirPath = 
System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
                flinkYarnClient.setConfigurationDirectory(confDirPath);
                
flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.getConfiguration());
                flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + 
File.separator + "flink-conf.yaml"));

http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
new file mode 100644
index 0000000..5cf3ddc
--- /dev/null
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class YarnClusterDescriptorTest {
+
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       /**
+        * Tests to ship a lib folder through the {@code 
YarnClusterDescriptor.addShipFiles}
+        */
+       @Test
+       public void testExplicitLibShipping() throws Exception {
+               AbstractYarnClusterDescriptor descriptor = new 
YarnClusterDescriptor();
+               descriptor.setLocalJarPath(new Path("/path/to/flink.jar"));
+
+               
descriptor.setConfigurationDirectory(temporaryFolder.getRoot().getAbsolutePath());
+               descriptor.setConfigurationFilePath(new 
Path(temporaryFolder.getRoot().getPath()));
+               descriptor.setFlinkConfiguration(new Configuration());
+
+               File libFile = temporaryFolder.newFile("libFile.jar");
+               File libFolder = temporaryFolder.newFolder().getAbsoluteFile();
+
+               Assert.assertFalse(descriptor.shipFiles.contains(libFile));
+               Assert.assertFalse(descriptor.shipFiles.contains(libFolder));
+
+               List<File> shipFiles = new ArrayList<>();
+               shipFiles.add(libFile);
+               shipFiles.add(libFolder);
+
+               descriptor.addShipFiles(shipFiles);
+
+               Assert.assertTrue(descriptor.shipFiles.contains(libFile));
+               Assert.assertTrue(descriptor.shipFiles.contains(libFolder));
+
+               // only execute part of the deployment to test for shipped files
+               Set<File> effectiveShipFiles = new HashSet<>();
+               descriptor.addLibFolderToShipFiles(effectiveShipFiles);
+
+               Assert.assertEquals(0, effectiveShipFiles.size());
+               Assert.assertEquals(2, descriptor.shipFiles.size());
+               Assert.assertTrue(descriptor.shipFiles.contains(libFile));
+               Assert.assertTrue(descriptor.shipFiles.contains(libFolder));
+       }
+
+       /**
+        * Tests to ship a lib folder through the {@code 
ConfigConstants.ENV_FLINK_LIB_DIR}
+        */
+       @Test
+       public void testEnvironmentLibShipping() throws Exception {
+               AbstractYarnClusterDescriptor descriptor = new 
YarnClusterDescriptor();
+
+               
descriptor.setConfigurationDirectory(temporaryFolder.getRoot().getAbsolutePath());
+               descriptor.setConfigurationFilePath(new 
Path(temporaryFolder.getRoot().getPath()));
+               descriptor.setFlinkConfiguration(new Configuration());
+
+               File libFolder = temporaryFolder.newFolder().getAbsoluteFile();
+               File libFile = new File(libFolder, "libFile.jar");
+               libFile.createNewFile();
+
+               Set<File> effectiveShipFiles = new HashSet<>();
+
+               final Map<String, String> oldEnv = System.getenv();
+               try {
+                       Map<String, String> env = new HashMap<>(1);
+                       env.put(ConfigConstants.ENV_FLINK_LIB_DIR, 
libFolder.getAbsolutePath());
+                       TestBaseUtils.setEnv(env);
+                       // only execute part of the deployment to test for 
shipped files
+                       descriptor.addLibFolderToShipFiles(effectiveShipFiles);
+               } finally {
+                       TestBaseUtils.setEnv(oldEnv);
+               }
+
+               // only add the ship the folder, not the contents
+               Assert.assertFalse(effectiveShipFiles.contains(libFile));
+               Assert.assertTrue(effectiveShipFiles.contains(libFolder));
+               Assert.assertFalse(descriptor.shipFiles.contains(libFile));
+               Assert.assertFalse(descriptor.shipFiles.contains(libFolder));
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 8ad5bcb..76b5d31 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.yarn;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.client.CliFrontend;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.TestLogger;
@@ -365,7 +366,7 @@ public abstract class YarnTestBase extends TestLogger {
                        File flinkConfDirPath = findFile(flinkDistRootDir, new 
ContainsName(new String[]{"flink-conf.yaml"}));
                        Assert.assertNotNull(flinkConfDirPath);
 
-                       map.put(CliFrontend.ENV_CONFIG_DIRECTORY, 
flinkConfDirPath.getParent());
+                       map.put(ConfigConstants.ENV_FLINK_CONF_DIR, 
flinkConfDirPath.getParent());
 
                        File yarnConfFile = writeYarnSiteConfigXML(conf);
                        map.put("YARN_CONF_DIR", 
yarnConfFile.getParentFile().getAbsolutePath());
@@ -596,7 +597,7 @@ public abstract class YarnTestBase extends TestLogger {
        public static void teardown() throws Exception {
                // Unset FLINK_CONF_DIR, as it might change the behavior of 
other tests
                Map<String, String> map = new HashMap<>(System.getenv());
-               map.remove(CliFrontend.ENV_CONFIG_DIRECTORY);
+               map.remove(ConfigConstants.ENV_FLINK_CONF_DIR);
                TestBaseUtils.setEnv(map);
 
                // When we are on travis, we copy the tmp files of JUnit 
(containing the MiniYARNCluster log files)

http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 81690c4..08a55cd 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -67,9 +67,13 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
+import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR;
 import static 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME;
 import static 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME;
 import static 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.getDynamicProperties;
@@ -107,19 +111,19 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
 
        private int taskManagerCount = 1;
 
-       private String yarnQueue = null;
+       private String yarnQueue;
 
        private String configurationDirectory;
 
        private Path flinkConfigurationPath;
 
-       private Path flinkLoggingConfigurationPath; // optional
-
        private Path flinkJarPath;
 
        private String dynamicPropertiesEncoded;
 
-       private List<File> shipFiles = new ArrayList<>();
+       /** Lazily initialized list of files to ship */
+       protected List<File> shipFiles = new LinkedList<>();
+
        private org.apache.flink.configuration.Configuration flinkConfiguration;
 
        private boolean detached;
@@ -137,34 +141,19 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                        }
                }
 
-               // load the config
-               this.configurationDirectory = 
CliFrontend.getConfigurationDirectoryFromEnv();
-               GlobalConfiguration.loadConfiguration(configurationDirectory);
-               this.flinkConfiguration = 
GlobalConfiguration.getConfiguration();
-
-               File confFile = new File(configurationDirectory + 
File.separator + CONFIG_FILE_NAME);
-               if (!confFile.exists()) {
-                       throw new RuntimeException("Unable to locate 
configuration file in " + confFile);
-               }
-               flinkConfigurationPath = new Path(confFile.getAbsolutePath());
+               // tries to load the config through the environment, if it 
fails it can still be set through the setters
+               try {
+                       this.configurationDirectory = 
CliFrontend.getConfigurationDirectoryFromEnv();
+                       
GlobalConfiguration.loadConfiguration(configurationDirectory);
+                       this.flinkConfiguration = 
GlobalConfiguration.getConfiguration();
 
-               //check if there is a logback or log4j file
-               if (configurationDirectory.length() > 0) {
-                       File logback = new File(configurationDirectory + 
File.pathSeparator + CONFIG_FILE_LOGBACK_NAME);
-                       if (logback.exists()) {
-                               shipFiles.add(logback);
-                               flinkLoggingConfigurationPath = new 
Path(logback.toURI());
-                       }
-                       File log4j = new File(configurationDirectory + 
File.pathSeparator + CONFIG_FILE_LOG4J_NAME);
-                       if (log4j.exists()) {
-                               shipFiles.add(log4j);
-                               if (flinkLoggingConfigurationPath != null) {
-                                       // this means there is already a 
logback configuration file --> fail
-                                       LOG.warn("The configuration directory 
('" + configurationDirectory + "') contains both LOG4J and " +
-                                               "Logback configuration files. 
Please delete or rename one of them.");
-                               }
-                               flinkLoggingConfigurationPath = new 
Path(log4j.toURI());
+                       File confFile = new File(configurationDirectory + 
File.separator + CONFIG_FILE_NAME);
+                       if (!confFile.exists()) {
+                               throw new RuntimeException("Unable to locate 
configuration file in " + confFile);
                        }
+                       flinkConfigurationPath = new 
Path(confFile.getAbsolutePath());
+               } catch (Exception e) {
+                       LOG.debug("Config couldn't be loaded from environment 
variable.");
                }
        }
 
@@ -227,14 +216,6 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                this.configurationDirectory = configurationDirectory;
        }
 
-       public void setFlinkLoggingConfigurationPath(Path logConfPath) {
-               flinkLoggingConfigurationPath = logConfPath;
-       }
-
-       public Path getFlinkLoggingConfigurationPath() {
-               return flinkLoggingConfigurationPath;
-       }
-
        public void setTaskManagerCount(int tmCount) {
                if(tmCount < 1) {
                        throw new IllegalArgumentException("The TaskManager 
count has to be at least 1.");
@@ -246,8 +227,8 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                return this.taskManagerCount;
        }
 
-       public void setShipFiles(List<File> shipFiles) {
-               for(File shipFile: shipFiles) {
+       public void addShipFiles(List<File> shipFiles) {
+               for (File shipFile: shipFiles) {
                        // remove uberjar from ship list (by default everything 
in the lib/ folder is added to
                        // the list of files to ship, but we handle the uberjar 
separately.
                        if(!(shipFile.getName().startsWith("flink-dist") && 
shipFile.getName().endsWith("jar"))) {
@@ -432,26 +413,9 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                LOG.info("\tJobManager memory = {}", jobManagerMemoryMb);
                LOG.info("\tTaskManager memory = {}", taskManagerMemoryMb);
 
-               // Create application via yarnClient
                final YarnClient yarnClient = getYarnClient();
-               final YarnClientApplication yarnApplication = 
yarnClient.createApplication();
-               GetNewApplicationResponse appResponse = 
yarnApplication.getNewApplicationResponse();
 
-               // ------------------ Add dynamic properties to local 
flinkConfiguraton ------
 
-               Map<String, String> dynProperties = 
getDynamicProperties(dynamicPropertiesEncoded);
-               for (Map.Entry<String, String> dynProperty : 
dynProperties.entrySet()) {
-                       flinkConfiguration.setString(dynProperty.getKey(), 
dynProperty.getValue());
-               }
-
-               // ------------------ Set default file system scheme 
-------------------------
-
-               try {
-                       
org.apache.flink.core.fs.FileSystem.setDefaultScheme(flinkConfiguration);
-               } catch (IOException e) {
-                       throw new IOException("Error while setting the default 
" +
-                               "filesystem scheme from configuration.", e);
-               }
                // ------------------ Check if the specified queue exists 
--------------------
 
                try {
@@ -482,6 +446,35 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                        }
                }
 
+               // ------------------ Add dynamic properties to local 
flinkConfiguraton ------
+
+               Map<String, String> dynProperties = 
getDynamicProperties(dynamicPropertiesEncoded);
+               for (Map.Entry<String, String> dynProperty : 
dynProperties.entrySet()) {
+                       flinkConfiguration.setString(dynProperty.getKey(), 
dynProperty.getValue());
+               }
+
+               // ------------------ Set default file system scheme 
-------------------------
+
+               try {
+                       
org.apache.flink.core.fs.FileSystem.setDefaultScheme(flinkConfiguration);
+               } catch (IOException e) {
+                       throw new IOException("Error while setting the default 
" +
+                               "filesystem scheme from configuration.", e);
+               }
+
+               // initialize file system
+               // Copy the application master jar to the filesystem
+               // Create a local resource to point to the destination jar path
+               final FileSystem fs = FileSystem.get(conf);
+
+               // hard coded check for the GoogleHDFS client because its not 
overriding the getScheme() method.
+               if 
(!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") &&
+                       fs.getScheme().startsWith("file")) {
+                       LOG.warn("The file system scheme is '" + fs.getScheme() 
+ "'. This indicates that the "
+                               + "specified Hadoop configuration path is wrong 
and the system is using the default Hadoop configuration values."
+                               + "The Flink YARN client needs to store its 
files in a distributed file system");
+               }
+
                // ------------------ Check if the YARN ClusterClient has the 
requested resources --------------
 
                // the yarnMinAllocationMB specifies the smallest possible 
container allocation size.
@@ -502,6 +495,10 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                        taskManagerMemoryMb =  yarnMinAllocationMB;
                }
 
+               // Create application via yarnClient
+               final YarnClientApplication yarnApplication = 
yarnClient.createApplication();
+               GetNewApplicationResponse appResponse = 
yarnApplication.getNewApplicationResponse();
+
                Resource maxRes = appResponse.getMaximumResourceCapability();
                final String NOTE = "Please check the 
'yarn.scheduler.maximum-allocation-mb' and the 
'yarn.nodemanager.resource.memory-mb' configuration values\n";
                if(jobManagerMemoryMb > maxRes.getMemory() ) {
@@ -556,62 +553,32 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                        }
                }
 
-               // ------------------ Prepare Application Master Container  
------------------------------
-
-               // respect custom JVM options in the YAML file
-               final String javaOpts = 
flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
-
-               String logbackFile = configurationDirectory + File.separator + 
CONFIG_FILE_LOGBACK_NAME;
-               boolean hasLogback = new File(logbackFile).exists();
-               String log4jFile = configurationDirectory + File.separator + 
CONFIG_FILE_LOG4J_NAME;
-
-               boolean hasLog4j = new File(log4jFile).exists();
-               if(hasLogback) {
-                       shipFiles.add(new File(logbackFile));
-               }
-               if(hasLog4j) {
-                       shipFiles.add(new File(log4jFile));
+               Set<File> effectiveShipFiles = new HashSet<>(shipFiles.size());
+               for (File file : shipFiles) {
+                       effectiveShipFiles.add(file.getAbsoluteFile());
                }
 
-               // Set up the container launch context for the application 
master
-               ContainerLaunchContext amContainer = 
Records.newRecord(ContainerLaunchContext.class);
-
-               String amCommand = "$JAVA_HOME/bin/java"
-                       + " -Xmx" + Utils.calculateHeapSize(jobManagerMemoryMb, 
flinkConfiguration)
-                       + "M " + javaOpts;
-
-               if(hasLogback || hasLog4j) {
-                       amCommand += " -Dlog.file=\"" + 
ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.log\"";
-
-                       if(hasLogback) {
-                               amCommand += " 
-Dlogback.configurationFile=file:" + CONFIG_FILE_LOGBACK_NAME;
-                       }
+               //check if there is a logback or log4j file
+               File logbackFile = new File(configurationDirectory + 
File.separator + CONFIG_FILE_LOGBACK_NAME);
+               final boolean hasLogback = logbackFile.exists();
+               if (hasLogback) {
+                       effectiveShipFiles.add(logbackFile);
+               }
 
-                       if(hasLog4j) {
-                               amCommand += " -Dlog4j.configuration=file:" + 
CONFIG_FILE_LOG4J_NAME;
+               File log4jFile = new File(configurationDirectory + 
File.separator + CONFIG_FILE_LOG4J_NAME);
+               final boolean hasLog4j = log4jFile.exists();
+               if (hasLog4j) {
+                       effectiveShipFiles.add(log4jFile);
+                       if (hasLogback) {
+                               // this means there is already a logback 
configuration file --> fail
+                               LOG.warn("The configuration directory ('" + 
configurationDirectory + "') contains both LOG4J and " +
+                                       "Logback configuration files. Please 
delete or rename one of them.");
                        }
                }
 
-               amCommand += " " + getApplicationMasterClass().getName() + " "
-                       + " 1>"
-                       + ApplicationConstants.LOG_DIR_EXPANSION_VAR + 
"/jobmanager.out"
-                       + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + 
"/jobmanager.err";
-               amContainer.setCommands(Collections.singletonList(amCommand));
+               addLibFolderToShipFiles(effectiveShipFiles);
 
-               LOG.debug("Application Master start command: " + amCommand);
-
-               // intialize HDFS
-               // Copy the application master jar to the filesystem
-               // Create a local resource to point to the destination jar path
-               final FileSystem fs = FileSystem.get(conf);
-
-               // hard coded check for the GoogleHDFS client because its not 
overriding the getScheme() method.
-               if 
(!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") &&
-                       fs.getScheme().startsWith("file")) {
-                       LOG.warn("The file system scheme is '" + fs.getScheme() 
+ "'. This indicates that the "
-                               + "specified Hadoop configuration path is wrong 
and the system is using the default Hadoop configuration values."
-                               + "The Flink YARN client needs to store its 
files in a distributed file system");
-               }
+               final ContainerLaunchContext amContainer = 
setupApplicationMasterContainer(hasLogback, hasLog4j);
 
                // Set-up ApplicationSubmissionContext for the application
                ApplicationSubmissionContext appContext = 
yarnApplication.getApplicationSubmissionContext();
@@ -634,52 +601,74 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
 
                final ApplicationId appId = appContext.getApplicationId();
 
-               // Setup jar for ApplicationMaster
-               LocalResource appMasterJar = 
Records.newRecord(LocalResource.class);
-               LocalResource flinkConf = 
Records.newRecord(LocalResource.class);
-               Path remotePathJar = Utils.setupLocalResource(fs, 
appId.toString(), flinkJarPath, appMasterJar, fs.getHomeDirectory());
-               Path remotePathConf = Utils.setupLocalResource(fs, 
appId.toString(), flinkConfigurationPath, flinkConf, fs.getHomeDirectory());
-               Map<String, LocalResource> localResources = new HashMap<>(2);
-               localResources.put("flink.jar", appMasterJar);
-               localResources.put("flink-conf.yaml", flinkConf);
-
-
-               // setup security tokens (code from apache storm)
-               final Path[] paths = new Path[2 + shipFiles.size()];
+               // local resource map for Yarn
+               final Map<String, LocalResource> localResources = new 
HashMap<>(2 + effectiveShipFiles.size());
+               // list of remote paths (after upload)
+               final List<Path> paths = new ArrayList<>(2 + 
effectiveShipFiles.size());
+               // classpath assembler
+               final StringBuilder classPathBuilder = new StringBuilder();
+               // ship list that enables reuse of resources for task manager 
containers
                StringBuilder envShipFileList = new StringBuilder();
-               // upload ship files
-               for (int i = 0; i < shipFiles.size(); i++) {
-                       File shipFile = shipFiles.get(i);
+
+               // upload and register ship files
+               for (File shipFile : effectiveShipFiles) {
                        LocalResource shipResources = 
Records.newRecord(LocalResource.class);
+
                        Path shipLocalPath = new Path("file://" + 
shipFile.getAbsolutePath());
-                       paths[2 + i] = Utils.setupLocalResource(fs, 
appId.toString(),
-                               shipLocalPath, shipResources, 
fs.getHomeDirectory());
+                       Path remotePath =
+                               Utils.setupLocalResource(fs, appId.toString(), 
shipLocalPath, shipResources, fs.getHomeDirectory());
+
+                       paths.add(remotePath);
+
                        localResources.put(shipFile.getName(), shipResources);
 
-                       envShipFileList.append(paths[2 + i]);
-                       if(i+1 < shipFiles.size()) {
-                               envShipFileList.append(',');
+                       
classPathBuilder.append(ApplicationConstants.Environment.PWD.$());
+                       classPathBuilder.append(File.separator);
+                       classPathBuilder.append(shipFile.getName());
+                       if (shipFile.isDirectory()) {
+                               // add directories to the classpath
+                               
classPathBuilder.append(File.separator).append("*");
                        }
+                       classPathBuilder.append(File.pathSeparator);
+
+                       envShipFileList.append(remotePath).append(",");
                }
 
-               paths[0] = remotePathJar;
-               paths[1] = remotePathConf;
+               // Setup jar for ApplicationMaster
+               LocalResource appMasterJar = 
Records.newRecord(LocalResource.class);
+               LocalResource flinkConf = 
Records.newRecord(LocalResource.class);
+               Path remotePathJar =
+                       Utils.setupLocalResource(fs, appId.toString(), 
flinkJarPath, appMasterJar, fs.getHomeDirectory());
+               Path remotePathConf =
+                       Utils.setupLocalResource(fs, appId.toString(), 
flinkConfigurationPath, flinkConf, fs.getHomeDirectory());
+               localResources.put("flink.jar", appMasterJar);
+               localResources.put("flink-conf.yaml", flinkConf);
+
+               paths.add(remotePathJar);
+               
classPathBuilder.append(ApplicationConstants.Environment.PWD.$()).append(File.separator)
+                       .append("flink.jar").append(File.pathSeparator);
+               paths.add(remotePathConf);
+               
classPathBuilder.append(ApplicationConstants.Environment.PWD.$()).append(File.separator)
+                       .append("flink-conf.yaml").append(File.pathSeparator);
+
                sessionFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + 
appId.toString() + "/");
 
                FsPermission permission = new FsPermission(FsAction.ALL, 
FsAction.NONE, FsAction.NONE);
                fs.setPermission(sessionFilesDir, permission); // set 
permission for path.
 
+               // setup security tokens
                Utils.setTokensFor(amContainer, paths, conf);
 
                amContainer.setLocalResources(localResources);
                fs.close();
 
-               // Setup CLASSPATH for ApplicationMaster
-               Map<String, String> appMasterEnv = new HashMap<>();
+               // Setup CLASSPATH and environment variables for 
ApplicationMaster
+               final Map<String, String> appMasterEnv = new HashMap<>();
                // set user specified app master environment variables
                
appMasterEnv.putAll(Utils.getEnvironmentVariables(ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX,
 flinkConfiguration));
-               // set classpath from YARN configuration
-               Utils.setupEnv(conf, appMasterEnv);
+               // set Flink app class path
+               appMasterEnv.put(YarnConfigKeys.ENV_FLINK_CLASSPATH, 
classPathBuilder.toString());
+
                // set Flink on YARN internal configuration values
                appMasterEnv.put(YarnConfigKeys.ENV_TM_COUNT, 
String.valueOf(taskManagerCount));
                appMasterEnv.put(YarnConfigKeys.ENV_TM_MEMORY, 
String.valueOf(taskManagerMemoryMb));
@@ -695,6 +684,9 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                        appMasterEnv.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, 
dynamicPropertiesEncoded);
                }
 
+               // set classpath from YARN configuration
+               Utils.setupYarnClassPath(conf, appMasterEnv);
+
                amContainer.setEnvironment(appMasterEnv);
 
                // Set up resource type requirements for ApplicationMaster
@@ -778,7 +770,6 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
 
                String host = report.getHost();
                int port = report.getRpcPort();
-               String trackingURL = report.getTrackingUrl();
 
                // Correctly initialize the Flink config
                
flinkConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host);
@@ -1015,6 +1006,61 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                }
        }
 
+       protected void addLibFolderToShipFiles(Set<File> effectiveShipFiles) {
+               // Add lib folder to the ship files if the environment variable 
is set.
+               // This is for convenience when running from the command-line.
+               // (for other files users explicitly set the ship files)
+               String libDir = System.getenv().get(ENV_FLINK_LIB_DIR);
+               if (libDir != null) {
+                       File libDirFile = new File(libDir);
+                       if (libDirFile.isDirectory()) {
+                               effectiveShipFiles.add(libDirFile);
+                       } else {
+                               throw new YarnDeploymentException("The 
environment variable '" + ENV_FLINK_LIB_DIR +
+                                       "' is set to '" + libDir + "' but the 
directory doesn't exist.");
+                       }
+               } else if (this.shipFiles.isEmpty()) {
+                       LOG.warn("Environment variable '{}' not set and ship 
files have not been provided manually. " +
+                               "Not shipping any library files.", 
ENV_FLINK_LIB_DIR);
+               }
+       }
+
+       protected ContainerLaunchContext 
setupApplicationMasterContainer(boolean hasLogback, boolean hasLog4j) {
+               // ------------------ Prepare Application Master Container  
------------------------------
+
+               // respect custom JVM options in the YAML file
+               final String javaOpts = 
flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
+
+               // Set up the container launch context for the application 
master
+               ContainerLaunchContext amContainer = 
Records.newRecord(ContainerLaunchContext.class);
+
+               String amCommand = "$JAVA_HOME/bin/java"
+                       + " -Xmx" + Utils.calculateHeapSize(jobManagerMemoryMb, 
flinkConfiguration)
+                       + "M " + javaOpts;
+
+               if (hasLogback || hasLog4j) {
+                       amCommand += " -Dlog.file=\"" + 
ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.log\"";
+
+                       if(hasLogback) {
+                               amCommand += " 
-Dlogback.configurationFile=file:" + CONFIG_FILE_LOGBACK_NAME;
+                       }
+
+                       if(hasLog4j) {
+                               amCommand += " -Dlog4j.configuration=file:" + 
CONFIG_FILE_LOG4J_NAME;
+                       }
+               }
+
+               amCommand += " " + getApplicationMasterClass().getName() + " "
+                       + " 1>"
+                       + ApplicationConstants.LOG_DIR_EXPANSION_VAR + 
"/jobmanager.out"
+                       + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + 
"/jobmanager.err";
+               amContainer.setCommands(Collections.singletonList(amCommand));
+
+               LOG.debug("Application Master start command: " + amCommand);
+
+               return amContainer;
+       }
+
        /**
         * Creates a YarnClusterClient; may be overriden in tests
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 4e8f747..f56c024 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -24,6 +24,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
@@ -94,9 +95,15 @@ public final class Utils {
        }
 
 
-       public static void setupEnv(Configuration conf, Map<String, String> 
appMasterEnv) {
-               addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), 
Environment.PWD.$() + File.separator + "*");
-               for (String c: 
conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH))
 {
+       public static void setupYarnClassPath(Configuration conf, Map<String, 
String> appMasterEnv) {
+               addToEnvironment(
+                       appMasterEnv,
+                       Environment.CLASSPATH.name(),
+                       appMasterEnv.get(YarnConfigKeys.ENV_FLINK_CLASSPATH));
+               String[] applicationClassPathEntries = conf.getStrings(
+                       YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+                       YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH);
+               for (String c : applicationClassPathEntries) {
                        addToEnvironment(appMasterEnv, 
Environment.CLASSPATH.name(), c.trim());
                }
        }
@@ -133,10 +140,10 @@ public final class Utils {
                
localResource.setVisibility(LocalResourceVisibility.APPLICATION);
        }
 
-       public static void setTokensFor(ContainerLaunchContext amContainer, 
Path[] paths, Configuration conf) throws IOException {
+       public static void setTokensFor(ContainerLaunchContext amContainer, 
List<Path> paths, Configuration conf) throws IOException {
                Credentials credentials = new Credentials();
                // for HDFS
-               TokenCache.obtainTokensForNamenodes(credentials, paths, conf);
+               TokenCache.obtainTokensForNamenodes(credentials, 
paths.toArray(new Path[0]), conf);
                // for HBase
                obtainTokenForHBase(credentials, conf);
                // for user

http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 24b5a35..563425f 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -66,6 +66,8 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+
 /**
  * This class is the executable entry point for the YARN application master.
  * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
@@ -517,6 +519,9 @@ public class YarnApplicationMasterRunner {
                String yarnClientUsername = 
env.get(YarnConfigKeys.ENV_CLIENT_USERNAME);
                require(yarnClientUsername != null, "Environment variable %s 
not set", YarnConfigKeys.ENV_CLIENT_USERNAME);
 
+               String classPathString = 
env.get(YarnConfigKeys.ENV_FLINK_CLASSPATH);
+               require(classPathString != null, "Environment variable %s not 
set", YarnConfigKeys.ENV_FLINK_CLASSPATH);
+
                // obtain a handle to the file system used by YARN
                final org.apache.hadoop.fs.FileSystem yarnFileSystem;
                try {
@@ -582,7 +587,9 @@ public class YarnApplicationMasterRunner {
                containerEnv.putAll(tmParams.taskManagerEnv());
 
                // add YARN classpath, etc to the container environment
-               Utils.setupEnv(yarnConfig, containerEnv);
+               containerEnv.put(ENV_FLINK_CLASSPATH, classPathString);
+               Utils.setupYarnClassPath(yarnConfig, containerEnv);
+
                containerEnv.put(YarnConfigKeys.ENV_CLIENT_USERNAME, 
yarnClientUsername);
 
                ctx.setEnvironment(containerEnv);

http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
index 7dc86a5..1a02712 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
@@ -37,6 +37,8 @@ public class YarnConfigKeys {
        public static final String ENV_DETACHED = "_DETACHED";
        public static final String ENV_DYNAMIC_PROPERTIES = 
"_DYNAMIC_PROPERTIES";
 
+       public static final String ENV_FLINK_CLASSPATH = "_FLINK_CLASSPATH";
+
        public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the 
Flink jar resource location (in HDFS).
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index a0225a7..126f0f1 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -43,14 +43,12 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
-import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -269,18 +267,13 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                        String shipPath = 
cmd.getOptionValue(SHIP_PATH.getOpt());
                        File shipDir = new File(shipPath);
                        if (shipDir.isDirectory()) {
-                               shipFiles = new 
ArrayList<>(Arrays.asList(shipDir.listFiles(new FilenameFilter() {
-                                       @Override
-                                       public boolean accept(File dir, String 
name) {
-                                               return !(name.equals(".") || 
name.equals(".."));
-                                       }
-                               })));
+                               shipFiles.add(shipDir);
                        } else {
                                LOG.warn("Ship directory is not a directory. 
Ignoring it.");
                        }
                }
 
-               yarnClusterDescriptor.setShipFiles(shipFiles);
+               yarnClusterDescriptor.addShipFiles(shipFiles);
 
                // queue
                if (cmd.hasOption(QUEUE.getOpt())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0483ba58/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9da3fa5..c3d4ea8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -505,7 +505,7 @@ under the License.
                <profile>
                        <!-- used for aggregating  ScalaDoc with JavaDoc -->
                        <id>aggregate-scaladoc</id>
-                        <dependencies>
+                       <dependencies>
                                <dependency>
                                        <!--
                                        This is necessary for building the java 
docs using Java 8. Otherwise the javadoc

Reply via email to