Repository: flink
Updated Branches:
  refs/heads/release-1.1 b9e6dcc3c -> 3b5d3c6f3


Revert "[FLINK-4913][yarn] include user jars in system class loader"

This reverts commit ea41b9c56fdc0af3c97d6dd48d04218db6176ec8.

This closes #2795


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b5d3c6f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b5d3c6f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b5d3c6f

Branch: refs/heads/release-1.1
Commit: 3b5d3c6f359dbbdfebcf0b7c034264a3ed9ad12c
Parents: b9e6dcc
Author: Ufuk Celebi <[email protected]>
Authored: Sat Nov 12 20:49:17 2016 +0100
Committer: Robert Metzger <[email protected]>
Committed: Fri Nov 25 16:14:52 2016 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    | 14 ++----
 .../flink/client/cli/CustomCommandLine.java     | 14 +-----
 .../org/apache/flink/client/cli/DefaultCLI.java |  5 +-
 .../flink/client/program/ClusterClient.java     | 40 +++++-----------
 .../flink/client/program/PackagedProgram.java   | 46 ++++++-------------
 .../client/program/StandaloneClusterClient.java |  6 ---
 .../org/apache/flink/api/scala/FlinkShell.scala |  7 +--
 ...CliFrontendYarnAddressConfigurationTest.java |  5 +-
 .../org/apache/flink/yarn/YarnTestBase.java     |  5 +-
 .../yarn/AbstractYarnClusterDescriptor.java     | 48 +-------------------
 .../apache/flink/yarn/YarnClusterClient.java    |  6 ---
 .../flink/yarn/cli/FlinkYarnSessionCli.java     | 10 +---
 12 files changed, 40 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 3a322dc..69963fe 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -234,7 +234,7 @@ public class CliFrontend {
                ClusterClient client = null;
                try {
 
-                       client = createClient(options, program);
+                       client = createClient(options, 
program.getMainClassName());
                        
client.setPrintStatusDuringExecution(options.getStdoutLogging());
                        client.setDetached(options.getDetachedMode());
                        LOG.debug("Client slots is set to {}", 
client.getMaxSlots());
@@ -871,12 +871,12 @@ public class CliFrontend {
        /**
         * Creates a {@link ClusterClient} object from the given command line 
options and other parameters.
         * @param options Command line options
-        * @param program The program for which to create the client.
+        * @param programName Program name
         * @throws Exception
         */
        protected ClusterClient createClient(
                        CommandLineOptions options,
-                       PackagedProgram program) throws Exception {
+                       String programName) throws Exception {
 
                // Get the custom command-line (e.g. Standalone/Yarn/Mesos)
                CustomCommandLine<?> activeCommandLine = 
getActiveCustomCommandLine(options.getCommandLine());
@@ -887,12 +887,8 @@ public class CliFrontend {
                        logAndSysout("Cluster configuration: " + 
client.getClusterIdentifier());
                } catch (UnsupportedOperationException e) {
                        try {
-                               String applicationName = "Flink Application: " 
+ program.getMainClassName();
-                               client = activeCommandLine.createCluster(
-                                       applicationName,
-                                       options.getCommandLine(),
-                                       config,
-                                       program.getAllLibraries());
+                               String applicationName = "Flink Application: " 
+ programName;
+                               client = 
activeCommandLine.createCluster(applicationName, options.getCommandLine(), 
config);
                                logAndSysout("Cluster started: " + 
client.getClusterIdentifier());
                        } catch (UnsupportedOperationException e2) {
                                throw new IllegalConfigurationException(

http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
index c58c74c..aecdc7c 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
@@ -22,9 +22,6 @@ import org.apache.commons.cli.Options;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 
-import java.net.URL;
-import java.util.List;
-
 
 /**
  * Custom command-line interface to load hooks for the command-line interface.
@@ -64,22 +61,15 @@ public interface CustomCommandLine<ClusterType extends 
ClusterClient> {
         * @return Client if a cluster could be retrieved
         * @throws UnsupportedOperationException if the operation is not 
supported
         */
-       ClusterType retrieveCluster(
-                       CommandLine commandLine,
-                       Configuration config) throws 
UnsupportedOperationException;
+       ClusterType retrieveCluster(CommandLine commandLine, Configuration 
config) throws UnsupportedOperationException;
 
        /**
         * Creates the client for the cluster
         * @param applicationName The application name to use
         * @param commandLine The command-line options parsed by the CliFrontend
         * @param config The Flink config to use
-        * @param userJarFiles User jar files to include in the classpath of 
the cluster.
         * @return The client to communicate with the cluster which the 
CustomCommandLine brought up.
         * @throws UnsupportedOperationException if the operation is not 
supported
         */
-       ClusterType createCluster(
-                       String applicationName,
-                       CommandLine commandLine,
-                       Configuration config,
-                       List<URL> userJarFiles) throws 
UnsupportedOperationException;
+       ClusterType createCluster(String applicationName, CommandLine 
commandLine, Configuration config) throws UnsupportedOperationException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
index 598c612..5f83c3d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -26,8 +26,6 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 
 import java.net.InetSocketAddress;
-import java.net.URL;
-import java.util.List;
 
 import static org.apache.flink.client.CliFrontend.setJobManagerAddressInConfig;
 
@@ -77,8 +75,7 @@ public class DefaultCLI implements 
CustomCommandLine<StandaloneClusterClient> {
        public StandaloneClusterClient createCluster(
                        String applicationName,
                        CommandLine commandLine,
-                       Configuration config,
-                       List<URL> userJarFiles) throws 
UnsupportedOperationException {
+                       Configuration config) throws 
UnsupportedOperationException {
 
                StandaloneClusterDescriptor descriptor = new 
StandaloneClusterDescriptor(config);
                return descriptor.deploy();

http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 5e88af6..2d743fa 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -307,27 +307,11 @@ public abstract class ClusterClient {
        {
                
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
                if (prog.isUsingProgramEntryPoint()) {
-
-                       final JobWithJars jobWithJars;
-                       if (hasUserJarsInClassPath(prog.getAllLibraries())) {
-                               jobWithJars = prog.getPlanWithoutJars();
-                       } else {
-                               jobWithJars = prog.getPlanWithJars();
-                       }
-
-                       return run(jobWithJars, parallelism, 
prog.getSavepointSettings());
+                       return run(prog.getPlanWithJars(), parallelism, 
prog.getSavepointSettings());
                }
                else if (prog.isUsingInteractiveMode()) {
                        LOG.info("Starting program in interactive mode");
-
-                       final List<URL> libraries;
-                       if (hasUserJarsInClassPath(prog.getAllLibraries())) {
-                               libraries = Collections.emptyList();
-                       } else {
-                               libraries = prog.getAllLibraries();
-                       }
-
-                       ContextEnvironmentFactory factory = new 
ContextEnvironmentFactory(this, libraries,
+                       ContextEnvironmentFactory factory = new 
ContextEnvironmentFactory(this, prog.getAllLibraries(),
                                        prog.getClasspaths(), 
prog.getUserCodeClassLoader(), parallelism, isDetached(),
                                        prog.getSavepointSettings());
                        ContextEnvironment.setAsContext(factory);
@@ -365,7 +349,7 @@ public abstract class ClusterClient {
         * Runs a program on the Flink cluster to which this client is 
connected. The call blocks until the
         * execution is complete, and returns afterwards.
         *
-        * @param jobWithJars The program to be executed.
+        * @param program The program to be executed.
         * @param parallelism The default parallelism to use when running the 
program. The default parallelism is used
         *                    when the program does not set a parallelism by 
itself.
         *
@@ -375,15 +359,15 @@ public abstract class ClusterClient {
         *                                    i.e. the job-manager is 
unreachable, or due to the fact that the
         *                                    parallel execution failed.
         */
-       public JobSubmissionResult run(JobWithJars jobWithJars, int 
parallelism, SavepointRestoreSettings savepointSettings)
+       public JobSubmissionResult run(JobWithJars program, int parallelism, 
SavepointRestoreSettings savepointSettings)
                        throws CompilerException, ProgramInvocationException {
-               ClassLoader classLoader = jobWithJars.getUserCodeClassLoader();
+               ClassLoader classLoader = program.getUserCodeClassLoader();
                if (classLoader == null) {
                        throw new IllegalArgumentException("The given 
JobWithJars does not provide a usercode class loader.");
                }
 
-               OptimizedPlan optPlan = getOptimizedPlan(compiler, jobWithJars, 
parallelism);
-               return run(optPlan, jobWithJars.getJarFiles(), 
jobWithJars.getClasspaths(), classLoader, savepointSettings);
+               OptimizedPlan optPlan = getOptimizedPlan(compiler, program, 
parallelism);
+               return run(optPlan, program.getJarFiles(), 
program.getClasspaths(), classLoader, savepointSettings);
        }
 
        public JobSubmissionResult run(
@@ -614,6 +598,10 @@ public abstract class ClusterClient {
                return getOptimizedPlan(compiler, prog.getPlan(), parallelism);
        }
 
+       public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) 
throws ProgramInvocationException {
+               return getJobGraph(optPlan, prog.getAllLibraries(), 
prog.getClasspaths(), null);
+       }
+
        public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan, 
SavepointRestoreSettings savepointSettings) throws ProgramInvocationException {
                return getJobGraph(optPlan, prog.getAllLibraries(), 
prog.getClasspaths(), savepointSettings);
        }
@@ -740,12 +728,6 @@ public abstract class ClusterClient {
        public abstract int getMaxSlots();
 
        /**
-        * Returns true if the client already has the user jar and providing it 
again would
-        * result in duplicate uploading of the jar.
-        */
-       public abstract boolean hasUserJarsInClassPath(List<URL> userJarFiles);
-
-       /**
         * Calls the subclasses' submitJob method. It may decide to simply call 
one of the run methods or it may perform
         * some custom job submission logic.
         * @param jobGraph The JobGraph to be submitted

http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index 8931a3e..daa5737 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -283,38 +283,23 @@ public class PackagedProgram {
        }
 
        /**
-        * Returns the plan without the required jars when the files are 
already provided by the cluster.
-        *
-        * @return The plan without attached jar files.
-        * @throws ProgramInvocationException
-        */
-       public JobWithJars getPlanWithoutJars() throws 
ProgramInvocationException {
-               if (isUsingProgramEntryPoint()) {
-                       return new JobWithJars(getPlan(), 
Collections.<URL>emptyList(), classpaths, userCodeClassLoader);
-               } else {
-                       throw new ProgramInvocationException("Cannot create a " 
+ JobWithJars.class.getSimpleName() +
-                               " for a program that is using the interactive 
mode.");
-               }
-       }
-
-       /**
         * Returns the plan with all required jars.
-        *
+        * 
         * @return The plan with attached jar files.
-        * @throws ProgramInvocationException
+        * @throws ProgramInvocationException 
         */
        public JobWithJars getPlanWithJars() throws ProgramInvocationException {
                if (isUsingProgramEntryPoint()) {
                        return new JobWithJars(getPlan(), getAllLibraries(), 
classpaths, userCodeClassLoader);
                } else {
-                       throw new ProgramInvocationException("Cannot create a " 
+ JobWithJars.class.getSimpleName() +
+                       throw new ProgramInvocationException("Cannot create a " 
+ JobWithJars.class.getSimpleName() + 
                                        " for a program that is using the 
interactive mode.");
                }
        }
 
        /**
         * Returns the analyzed plan without any optimizations.
-        *
+        * 
         * @return
         *         the analyzed plan without any optimizations.
         * @throws ProgramInvocationException Thrown if an error occurred in the
@@ -324,7 +309,7 @@ public class PackagedProgram {
        public String getPreviewPlan() throws ProgramInvocationException {
                
Thread.currentThread().setContextClassLoader(this.getUserCodeClassLoader());
                List<DataSinkNode> previewPlan;
-
+               
                if (isUsingProgramEntryPoint()) {
                        previewPlan = 
Optimizer.createPreOptimizedPlan(getPlan());
                }
@@ -351,7 +336,7 @@ public class PackagedProgram {
                        finally {
                                env.unsetAsContext();
                        }
-
+                       
                        if (env.previewPlan != null) {
                                previewPlan =  env.previewPlan;
                        } else {
@@ -375,7 +360,7 @@ public class PackagedProgram {
        /**
         * Returns the description provided by the Program class. This
         * may contain a description of the plan itself and its arguments.
-        *
+        * 
         * @return The description of the PactProgram's input parameters.
         * @throws ProgramInvocationException
         *         This invocation is thrown if the Program can't be properly 
loaded. Causes
@@ -383,7 +368,7 @@ public class PackagedProgram {
         */
        public String getDescription() throws ProgramInvocationException {
                if (ProgramDescription.class.isAssignableFrom(this.mainClass)) {
-
+                       
                        ProgramDescription descr;
                        if (this.program != null) {
                                descr = (ProgramDescription) this.program;
@@ -395,22 +380,22 @@ public class PackagedProgram {
                                        return null;
                                }
                        }
-
+                       
                        try {
                                return descr.getDescription();
                        }
                        catch (Throwable t) {
-                               throw new ProgramInvocationException("Error 
while getting the program description" +
+                               throw new ProgramInvocationException("Error 
while getting the program description" + 
                                                (t.getMessage() == null ? "." : 
": " + t.getMessage()), t);
                        }
-
+                       
                } else {
                        return null;
                }
        }
-
+       
        /**
-        *
+        * 
         * This method assumes that the context environment is prepared, or the 
execution
         * will be a local execution by default.
         */
@@ -433,16 +418,13 @@ public class PackagedProgram {
 
        /**
         * Gets the {@link java.lang.ClassLoader} that must be used to load 
user code classes.
-        *
+        * 
         * @return The user code ClassLoader.
         */
        public ClassLoader getUserCodeClassLoader() {
                return this.userCodeClassLoader;
        }
 
-       /**
-        * Returns all provided libraries needed to run the program.
-        */
        public List<URL> getAllLibraries() {
                List<URL> libs = new 
ArrayList<URL>(this.extractedTempLibraries.size() + 1);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
index 296ddc9..3343b69 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -28,7 +28,6 @@ import scala.concurrent.Await;
 import scala.concurrent.Future;
 
 import java.io.IOException;
-import java.net.URL;
 import java.util.Collections;
 import java.util.List;
 
@@ -88,11 +87,6 @@ public class StandaloneClusterClient extends ClusterClient {
        }
 
        @Override
-       public boolean hasUserJarsInClassPath(List<URL> userJarFiles) {
-               return false;
-       }
-
-       @Override
        protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader 
classLoader)
                        throws ProgramInvocationException {
                if (isDetached()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala 
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index f00013e..fb70280 100644
--- 
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ 
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.api.scala
 
 import java.io._
-import java.util.Collections
 
 import org.apache.commons.cli.CommandLine
 import org.apache.flink.client.cli.CliFrontendParser
@@ -253,11 +252,7 @@ object FlinkShell {
     val config = frontend.getConfiguration
     val customCLI = frontend.getActiveCustomCommandLine(options.getCommandLine)
 
-    val cluster = customCLI.createCluster(
-      "Flink Scala Shell",
-      options.getCommandLine,
-      config,
-      Collections.emptyList())
+    val cluster = customCLI.createCluster("Flink Scala Shell", 
options.getCommandLine, config)
 
     val address = cluster.getJobManagerAddress.getAddress.getHostAddress
     val port = cluster.getJobManagerAddress.getPort

http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index 77d3149..8ba786f 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.client.cli.RunOptions;
 import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -332,8 +331,8 @@ public class CliFrontendYarnAddressConfigurationTest {
 
                @Override
                // make method public
-               public ClusterClient createClient(CommandLineOptions options, 
PackagedProgram program) throws Exception {
-                       return super.createClient(options, program);
+               public ClusterClient createClient(CommandLineOptions options, 
String programName) throws Exception {
+                       return super.createClient(options, programName);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 78e16ed..7e612c4 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -23,7 +23,6 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
@@ -668,9 +667,9 @@ public abstract class YarnTestBase extends TestLogger {
                public TestingCLI() throws Exception {}
 
                @Override
-               protected ClusterClient createClient(CommandLineOptions 
options, PackagedProgram program) throws Exception {
+               protected ClusterClient createClient(CommandLineOptions 
options, String programName) throws Exception {
                        // mock the returned ClusterClient to disable shutdown 
and verify shutdown behavior later on
-                       originalClusterClient = super.createClient(options, 
program);
+                       originalClusterClient = super.createClient(options, 
programName);
                        spiedClusterClient = Mockito.spy(originalClusterClient);
                        Mockito.doNothing().when(spiedClusterClient).shutdown();
                        return spiedClusterClient;

http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 000b2c1..ab1fbc1 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -61,8 +61,6 @@ import java.io.PrintStream;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.security.PrivilegedExceptionAction;
-import java.net.URISyntaxException;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -132,10 +130,6 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
 
        private String zookeeperNamespace;
 
-       /** Optional Jar file to include in the system class loader of all 
application nodes
-        * (for per-job submission) */
-       private Set<File> userJarFiles;
-
        public AbstractYarnClusterDescriptor() {
                // for unit tests only
                if(System.getenv("IN_TESTS") != null) {
@@ -246,41 +240,6 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                this.dynamicPropertiesEncoded = dynamicPropertiesEncoded;
        }
 
-       /**
-        * Returns true if the descriptor has the job jars to include in the 
classpath.
-        */
-       public boolean hasUserJarFiles(List<URL> requiredJarFiles) {
-               if (userJarFiles == null || userJarFiles.size() != 
requiredJarFiles.size()) {
-                       return false;
-               }
-               try {
-                       for(URL jarFile : requiredJarFiles) {
-                               if (!userJarFiles.contains(new 
File(jarFile.toURI()))) {
-                                       return false;
-                               }
-                       }
-               } catch (URISyntaxException e) {
-                       return false;
-               }
-               return true;
-       }
-
-       /**
-        * Sets the user jar which is included in the system classloader of all 
nodes.
-        */
-       public void setProvidedUserJarFiles(List<URL> userJarFiles) {
-               Set<File> localUserJarFiles = new 
HashSet<>(userJarFiles.size());
-               for (URL jarFile : userJarFiles) {
-                       try {
-                               localUserJarFiles.add(new 
File(jarFile.toURI()));
-                       } catch (URISyntaxException e) {
-                               throw new IllegalArgumentException("Couldn't 
add local user jar: " + jarFile
-                                       + " Currently only file:/// URLs are 
supported.");
-                       }
-               }
-               this.userJarFiles = localUserJarFiles;
-       }
-
        public String getDynamicPropertiesEncoded() {
                return this.dynamicPropertiesEncoded;
        }
@@ -596,11 +555,6 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
 
                final ContainerLaunchContext amContainer = 
setupApplicationMasterContainer(hasLogback, hasLog4j);
 
-               // add the user jar to the classpath of the to-be-created 
cluster
-               if (userJarFiles != null) {
-                       effectiveShipFiles.addAll(userJarFiles);
-               }
-
                // Set-up ApplicationSubmissionContext for the application
                ApplicationSubmissionContext appContext = 
yarnApplication.getApplicationSubmissionContext();
 
@@ -755,7 +709,7 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                        try {
                                report = yarnClient.getApplicationReport(appId);
                        } catch (IOException e) {
-                               throw new YarnDeploymentException("Failed to 
deploy the cluster.", e);
+                               throw new YarnDeploymentException("Failed to 
deploy the cluster: " + e.getMessage());
                        }
                        YarnApplicationState appState = 
report.getYarnApplicationState();
                        switch(appState) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index cd447d7..79501b1 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -55,7 +55,6 @@ import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -195,11 +194,6 @@ public class YarnClusterClient extends ClusterClient {
        }
 
        @Override
-       public boolean hasUserJarsInClassPath(List<URL> userJarFiles) {
-               return clusterDescriptor.hasUserJarFiles(userJarFiles);
-       }
-
-       @Override
        protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
                if (isDetached()) {
                        if (newlyCreatedCluster) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3b5d3c6f/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 4823d35..28d8fb8 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -29,7 +29,6 @@ import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterClient;
@@ -49,7 +48,6 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
-import java.net.URL;
 import java.net.URLDecoder;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
@@ -517,16 +515,10 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
        }
 
        @Override
-       public YarnClusterClient createCluster(
-                       String applicationName,
-                       CommandLine cmdLine,
-                       Configuration config,
-                       List<URL> userJarFiles) {
-               Preconditions.checkNotNull(userJarFiles, "User jar files should 
not be null.");
+       public YarnClusterClient createCluster(String applicationName, 
CommandLine cmdLine, Configuration config) {
 
                AbstractYarnClusterDescriptor yarnClusterDescriptor = 
createDescriptor(applicationName, cmdLine);
                yarnClusterDescriptor.setFlinkConfiguration(config);
-               yarnClusterDescriptor.setProvidedUserJarFiles(userJarFiles);
 
                try {
                        return yarnClusterDescriptor.deploy();

Reply via email to