yanghua closed pull request #5617: [FLINK-8799][YARN] Make 
AbstractYarnClusterDescriptor immutable
URL: https://github.com/apache/flink/pull/5617
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
index 4d2aaa02efe..ff74e4d358e 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
@@ -21,15 +21,12 @@
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
 import java.io.File;
 import java.io.FilenameFilter;
-import java.util.ArrayList;
-import java.util.List;
 
 /**
  * Yarn client which starts a {@link TestingApplicationMaster}. Additionally 
the client adds the
@@ -50,25 +47,6 @@ public TestingYarnClusterDescriptor(
                        configurationDirectory,
                        yarnClient,
                        sharedYarnClient);
-               List<File> filesToShip = new ArrayList<>();
-
-               File testingJar = YarnTestBase.findFile("..", new 
TestJarFinder("flink-yarn-tests"));
-               Preconditions.checkNotNull(testingJar, "Could not find the 
flink-yarn-tests tests jar. " +
-                       "Make sure to package the flink-yarn-tests module.");
-
-               File testingRuntimeJar = YarnTestBase.findFile("..", new 
TestJarFinder("flink-runtime"));
-               Preconditions.checkNotNull(testingRuntimeJar, "Could not find 
the flink-runtime tests " +
-                       "jar. Make sure to package the flink-runtime module.");
-
-               File testingYarnJar = YarnTestBase.findFile("..", new 
TestJarFinder("flink-yarn"));
-               Preconditions.checkNotNull(testingRuntimeJar, "Could not find 
the flink-yarn tests " +
-                       "jar. Make sure to package the flink-yarn module.");
-
-               filesToShip.add(testingJar);
-               filesToShip.add(testingRuntimeJar);
-               filesToShip.add(testingYarnJar);
-
-               addShipFiles(filesToShip);
        }
 
        @Override
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index f9c03f93784..18e8e6b58a5 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -34,12 +34,13 @@
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.testkit.JavaTestKit;
 import org.apache.curator.test.TestingServer;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.junit.AfterClass;
@@ -50,7 +51,6 @@
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
-import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 
 import scala.concurrent.duration.FiniteDuration;
@@ -110,29 +110,50 @@ public void testMultipleAMKill() throws Exception {
                final int numberKillingAttempts = numberApplicationAttempts - 1;
                String confDirPath = 
System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
                final Configuration configuration = 
GlobalConfiguration.loadConfiguration();
-               TestingYarnClusterDescriptor flinkYarnClient = new 
TestingYarnClusterDescriptor(
-                       configuration,
-                       getYarnConfiguration(),
-                       confDirPath,
-                       getYarnClient(),
-                       true);
 
-               Assert.assertNotNull("unable to get yarn client", 
flinkYarnClient);
-               flinkYarnClient.setLocalJarPath(new 
Path(flinkUberjar.getAbsolutePath()));
-               
flinkYarnClient.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
+               configuration.setString(YarnConfigOptions.FLINK_JAR, 
flinkUberjar.getAbsolutePath());
 
-               String fsStateHandlePath = temp.getRoot().getPath();
+               StringBuilder sb = new StringBuilder();
+               for (File file : flinkLibFolder.listFiles()) {
+                       sb.append(file.getAbsolutePath());
+                       sb.append(",");
+               }
 
-               // load the configuration
-               File configDirectory = new File(confDirPath);
-               
GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
+               String linkedShipFiles = sb.toString();
+
+               File testingJar = YarnTestBase.findFile("..", new 
TestingYarnClusterDescriptor.TestJarFinder("flink-yarn-tests"));
+               Preconditions.checkNotNull(testingJar, "Could not find the 
flink-yarn-tests tests jar. " +
+                       "Make sure to package the flink-yarn-tests module.");
+               linkedShipFiles += testingJar.getAbsolutePath();
+
+               File testingRuntimeJar = YarnTestBase.findFile("..", new 
TestingYarnClusterDescriptor.TestJarFinder("flink-runtime"));
+               Preconditions.checkNotNull(testingRuntimeJar, "Could not find 
the flink-runtime tests " +
+                       "jar. Make sure to package the flink-runtime module.");
+               linkedShipFiles += ("," + testingRuntimeJar.getAbsolutePath());
+
+               File testingYarnJar = YarnTestBase.findFile("..", new 
TestingYarnClusterDescriptor.TestJarFinder("flink-yarn"));
+               Preconditions.checkNotNull(testingRuntimeJar, "Could not find 
the flink-yarn tests " +
+                       "jar. Make sure to package the flink-yarn module.");
+               linkedShipFiles += ("," + testingYarnJar.getAbsolutePath());
 
-               
flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum="
 +
+               configuration.setString(YarnConfigOptions.YARN_SHIP_PATHS, 
linkedShipFiles);
+
+               String fsStateHandlePath = temp.getRoot().getPath();
+               
configuration.setString(YarnConfigOptions.DYNAMIC_PROPERTIES_ENCODED, 
"recovery.mode=zookeeper@@recovery.zookeeper.quorum=" +
                        zkServer.getConnectString() + 
"@@yarn.application-attempts=" + numberApplicationAttempts +
                        "@@" + CheckpointingOptions.STATE_BACKEND.key() + 
"=FILESYSTEM" +
                        "@@" + CheckpointingOptions.CHECKPOINTS_DIRECTORY + "=" 
+ fsStateHandlePath + "/checkpoints" +
                        "@@" + HighAvailabilityOptions.HA_STORAGE_PATH.key() + 
"=" + fsStateHandlePath + "/recovery");
 
+               TestingYarnClusterDescriptor flinkYarnClient = new 
TestingYarnClusterDescriptor(
+                       configuration, getYarnConfiguration(), confDirPath, 
getYarnClient(), true);
+
+               Assert.assertNotNull("unable to get yarn client", 
flinkYarnClient);
+
+               // load the configuration
+               File configDirectory = new File(confDirPath);
+               
GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
+
                ClusterClient<ApplicationId> yarnCluster = null;
 
                final FiniteDuration timeout = new FiniteDuration(2, 
TimeUnit.MINUTES);
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
index ef6706ad5fe..9c193d91889 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
@@ -27,8 +27,8 @@
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.junit.BeforeClass;
@@ -36,7 +36,6 @@
 import org.junit.Test;
 
 import java.io.File;
-import java.util.Arrays;
 import java.util.Random;
 
 /**
@@ -55,7 +54,19 @@ public static void setup() {
        public void testPerJobMode() throws Exception {
                Configuration configuration = new Configuration();
                configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s");
-               final YarnClient yarnClient = getYarnClient();
+               configuration.setString(YarnConfigOptions.FLINK_JAR, 
flinkUberjar.getAbsolutePath());
+
+               StringBuilder sb = new StringBuilder();
+               for (File file : flinkLibFolder.listFiles()) {
+                       sb.append(file.getAbsolutePath());
+                       sb.append(",");
+               }
+
+               String linkedShipFiles = sb.toString();
+               linkedShipFiles = linkedShipFiles.substring(0, 
linkedShipFiles.length() - 1);
+               configuration.setString(YarnConfigOptions.YARN_SHIP_PATHS, 
linkedShipFiles);
+
+               final YarnClient yarnClient = YarnClient.createYarnClient();
 
                try (final Flip6YarnClusterDescriptor 
flip6YarnClusterDescriptor = new Flip6YarnClusterDescriptor(
                        configuration,
@@ -64,9 +75,6 @@ public void testPerJobMode() throws Exception {
                        yarnClient,
                        true)) {
 
-                       flip6YarnClusterDescriptor.setLocalJarPath(new 
Path(flinkUberjar.getAbsolutePath()));
-                       
flip6YarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
-
                        final ClusterSpecification clusterSpecification = new 
ClusterSpecification.ClusterSpecificationBuilder()
                                .setMasterMemoryMB(768)
                                .setTaskManagerMemoryMB(1024)
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index b3dcaca1459..5978ad64ce9 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -27,7 +27,6 @@
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -46,7 +45,6 @@
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.List;
 
@@ -231,6 +229,17 @@ public void testJavaAPI() throws Exception {
 
                String confDirPath = 
System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
                Configuration configuration = 
GlobalConfiguration.loadConfiguration();
+               configuration.setString(YarnConfigOptions.FLINK_JAR, 
flinkUberjar.getAbsolutePath());
+
+               StringBuilder sb = new StringBuilder();
+               for (File file : flinkLibFolder.listFiles()) {
+                       sb.append(file.getAbsolutePath());
+                       sb.append(",");
+               }
+
+               String linkedShipFiles = sb.toString();
+               linkedShipFiles = linkedShipFiles.substring(0, 
linkedShipFiles.length() - 1);
+               configuration.setString(YarnConfigOptions.YARN_SHIP_PATHS, 
linkedShipFiles);
 
                try (final AbstractYarnClusterDescriptor clusterDescriptor = 
new YarnClusterDescriptor(
                        configuration,
@@ -239,8 +248,6 @@ public void testJavaAPI() throws Exception {
                        getYarnClient(),
                        true)) {
                        Assert.assertNotNull("unable to get yarn client", 
clusterDescriptor);
-                       clusterDescriptor.setLocalJarPath(new 
Path(flinkUberjar.getAbsolutePath()));
-                       
clusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
 
                        final ClusterSpecification clusterSpecification = new 
ClusterSpecification.ClusterSpecificationBuilder()
                                .setMasterMemoryMB(768)
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index 2a1b099399a..fe11ab5454e 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -37,8 +37,8 @@
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -51,7 +51,6 @@
 
 import java.io.File;
 import java.net.URI;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -88,6 +87,17 @@ public void testFlinkContainerMemory() throws Exception {
                
configuration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 
0);
                
configuration.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, (1L << 
20));
                
configuration.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, (4L << 
20));
+               configuration.setString(YarnConfigOptions.FLINK_JAR, 
flinkUberjar.getAbsolutePath());
+
+               StringBuilder sb = new StringBuilder();
+               for (File file : flinkLibFolder.listFiles()) {
+                       sb.append(file.getAbsolutePath());
+                       sb.append(",");
+               }
+
+               String linkedShipFiles = sb.toString();
+               linkedShipFiles = linkedShipFiles.substring(0, 
linkedShipFiles.length() - 1);
+               configuration.setString(YarnConfigOptions.YARN_SHIP_PATHS, 
linkedShipFiles);
 
                final YarnConfiguration yarnConfiguration = 
getYarnConfiguration();
                final Flip6YarnClusterDescriptor clusterDescriptor = new 
Flip6YarnClusterDescriptor(
@@ -97,9 +107,6 @@ public void testFlinkContainerMemory() throws Exception {
                        yarnClient,
                        true);
 
-               clusterDescriptor.setLocalJarPath(new 
Path(flinkUberjar.getAbsolutePath()));
-               
clusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
-
                final File streamingWordCountFile = new 
File("target/programs/WindowJoin.jar");
 
                assertThat(streamingWordCountFile.exists(), is(true));
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index bdb471a142f..91cd459ba91 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -79,10 +79,13 @@
 import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.URISyntaxException;
 import java.net.URL;
+import java.net.URLDecoder;
+import java.nio.charset.Charset;
 import java.nio.file.FileVisitResult;
 import java.nio.file.Files;
 import java.nio.file.SimpleFileVisitor;
@@ -99,6 +102,7 @@
 import java.util.Set;
 
 import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR;
+import static 
org.apache.flink.configuration.HighAvailabilityOptions.HA_CLUSTER_ID;
 import static 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME;
 import static 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME;
 import static 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.getDynamicProperties;
@@ -122,12 +126,15 @@
        /** True if the descriptor must not shut down the YarnClient. */
        private final boolean sharedYarnClient;
 
+       @Nullable
        private String yarnQueue;
 
        private String configurationDirectory;
 
+       @Nullable
        private Path flinkJarPath;
 
+       @Nullable
        private String dynamicPropertiesEncoded;
 
        /** Lazily initialized list of files to ship. */
@@ -137,8 +144,10 @@
 
        private boolean detached;
 
+       @Nullable
        private String customName;
 
+       @Nullable
        private String zookeeperNamespace;
 
        /** Optional Jar file to include in the system class loader of all 
application nodes
@@ -172,6 +181,88 @@ public AbstractYarnClusterDescriptor(
                userJarInclusion = getUserJarInclusionMode(flinkConfiguration);
 
                this.configurationDirectory = 
Preconditions.checkNotNull(configurationDirectory);
+
+               String yarnQueueConfigValue = 
flinkConfiguration.getString(YarnConfigOptions.YARN_QUEUE);
+               if (yarnQueueConfigValue != null) {
+                       this.yarnQueue = yarnQueueConfigValue;
+               }
+
+               String zkNamespaceConfigValue = 
flinkConfiguration.getString(HA_CLUSTER_ID);
+               if (zkNamespaceConfigValue != null) {
+                       this.zookeeperNamespace = zkNamespaceConfigValue;
+               }
+
+               this.detached = 
flinkConfiguration.getBoolean(YarnConfigOptions.DETACHED_MODE);
+
+               String dynamicPropertiesEncodedConfigValue = 
flinkConfiguration.getString(YarnConfigOptions.DYNAMIC_PROPERTIES_ENCODED);
+               if (dynamicPropertiesEncodedConfigValue != null) {
+                       this.dynamicPropertiesEncoded = 
dynamicPropertiesEncodedConfigValue;
+               }
+
+               String nameConfigValue = 
flinkConfiguration.getString(YarnConfigOptions.YARN_APPLICATION_NAME);
+               if (nameConfigValue != null) {
+                       this.customName = nameConfigValue;
+               }
+
+               buildFlinkJarPath(flinkConfiguration);
+
+               this.addShipFiles();
+       }
+
+       private void buildFlinkJarPath(Configuration flinkConfiguration) {
+               String flinkJarPathStrConfigValue = 
flinkConfiguration.getString(YarnConfigOptions.FLINK_JAR);
+               final Path localJarPath;
+               if (flinkJarPathStrConfigValue != null) {
+                       if (!flinkJarPathStrConfigValue.startsWith("file://")) {
+                               flinkJarPathStrConfigValue = "file://" + 
flinkJarPathStrConfigValue;
+                       }
+
+                       if (!flinkJarPathStrConfigValue.endsWith("jar")) {
+                               LOG.error("The passed jar path ('" + 
flinkJarPathStrConfigValue + "') does not end with the 'jar' extension");
+                               localJarPath = null;
+                       } else {
+                               localJarPath = new 
Path(flinkJarPathStrConfigValue);
+                       }
+               } else {
+                       LOG.info("No path for the flink jar passed. Using the 
location of "
+                               + this.getClass() + " to locate the jar");
+                       String encodedJarPath =
+                               
this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
+
+                       final String decodedPath;
+                       try {
+                               // we have to decode the url encoded parts of 
the path
+                               decodedPath = URLDecoder.decode(encodedJarPath, 
Charset.defaultCharset().name());
+                       } catch (UnsupportedEncodingException e) {
+                               throw new RuntimeException("Couldn't decode the 
encoded Flink dist jar path: " + encodedJarPath +
+                                       " Please supply a path manually via the 
jar option.");
+                       }
+
+                       // check whether it's actually a jar file --> when 
testing we execute this class without a flink-dist jar
+                       if (decodedPath.endsWith(".jar")) {
+                               localJarPath = new Path(new 
File(decodedPath).toURI());
+                       } else {
+                               localJarPath = null;
+                       }
+               }
+
+               if (localJarPath != null) {
+                       this.flinkJarPath = localJarPath;
+               }
+       }
+
+       private void addShipFiles() {
+               List<File> shipFiles = new ArrayList<>();
+               // path to directory to ship
+               String shipPathConfigValue = 
flinkConfiguration.getString(YarnConfigOptions.YARN_SHIP_PATHS);
+               if (shipPathConfigValue != null) {
+                       String[] shipPaths = shipPathConfigValue.split(",");
+                       for (String shipPath : shipPaths) {
+                               shipFiles.add(new File(shipPath));
+                       }
+               }
+
+               this.shipFiles.addAll(shipFiles);
        }
 
        public YarnClient getYarnClient() {
@@ -194,34 +285,6 @@ public Configuration getFlinkConfiguration() {
                return flinkConfiguration;
        }
 
-       public void setQueue(String queue) {
-               this.yarnQueue = queue;
-       }
-
-       public void setLocalJarPath(Path localJarPath) {
-               if (!localJarPath.toString().endsWith("jar")) {
-                       throw new IllegalArgumentException("The passed jar path 
('" + localJarPath + "') does not end with the 'jar' extension");
-               }
-               this.flinkJarPath = localJarPath;
-       }
-
-       /**
-        * Adds the given files to the list of files to ship.
-        *
-        * <p>Note that any file matching "<tt>flink-dist*.jar</tt>" will be 
excluded from the upload by
-        * {@link #uploadAndRegisterFiles(Collection, FileSystem, Path, 
ApplicationId, List, Map, StringBuilder)}
-        * since we upload the Flink uber jar ourselves and do not need to 
deploy it multiple times.
-        *
-        * @param shipFiles files to ship
-        */
-       public void addShipFiles(List<File> shipFiles) {
-               this.shipFiles.addAll(shipFiles);
-       }
-
-       public void setDynamicPropertiesEncoded(String 
dynamicPropertiesEncoded) {
-               this.dynamicPropertiesEncoded = dynamicPropertiesEncoded;
-       }
-
        /**
         * Returns true if the descriptor has the job jars to include in the 
classpath.
         */
@@ -247,7 +310,7 @@ public boolean hasUserJarFiles(List<URL> requiredJarFiles) {
        /**
         * Sets the user jar which is included in the system classloader of all 
nodes.
         */
-       public void setProvidedUserJarFiles(List<URL> userJarFiles) {
+       private void setProvidedUserJarFiles(List<URL> userJarFiles) {
                for (URL jarFile : userJarFiles) {
                        try {
                                this.userJarFiles.add(new 
File(jarFile.toURI()));
@@ -258,10 +321,21 @@ public void setProvidedUserJarFiles(List<URL> 
userJarFiles) {
                }
        }
 
+       @Nullable
        public String getDynamicPropertiesEncoded() {
                return this.dynamicPropertiesEncoded;
        }
 
+       @Nullable
+       public Path getFlinkJarPath() {
+               return flinkJarPath;
+       }
+
+       @Nullable
+       public String getCustomName() {
+               return customName;
+       }
+
        private void isReadyForDeployment(ClusterSpecification 
clusterSpecification) throws YarnDeploymentException {
 
                if (clusterSpecification.getNumberTaskManagers() <= 0) {
@@ -311,10 +385,6 @@ private static boolean allocateResource(int[] 
nodeManagers, int toAllocate) {
                return false;
        }
 
-       public void setDetachedMode(boolean detachedMode) {
-               this.detached = detachedMode;
-       }
-
        public boolean isDetachedMode() {
                return detached;
        }
@@ -323,10 +393,6 @@ public String getZookeeperNamespace() {
                return zookeeperNamespace;
        }
 
-       public void setZookeeperNamespace(String zookeeperNamespace) {
-               this.zookeeperNamespace = zookeeperNamespace;
-       }
-
        // -------------------------------------------------------------
        // Lifecycle management
        // -------------------------------------------------------------
@@ -708,7 +774,7 @@ public ApplicationReport startAppMaster(
                if (zkNamespace == null || zkNamespace.isEmpty()) {
                        // namespace defined in config? else use applicationId 
as default.
                        zkNamespace = 
configuration.getString(HighAvailabilityOptions.HA_CLUSTER_ID, 
String.valueOf(appId));
-                       setZookeeperNamespace(zkNamespace);
+                       this.zookeeperNamespace = zkNamespace;
                }
 
                configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, 
zkNamespace);
@@ -1267,13 +1333,6 @@ public String getClusterDescription() {
                }
        }
 
-       public void setName(String name) {
-               if (name == null) {
-                       throw new IllegalArgumentException("The passed name is 
null");
-               }
-               customName = name;
-       }
-
        private void 
activateHighAvailabilitySupport(ApplicationSubmissionContext appContext) throws
                InvocationTargetException, IllegalAccessException {
 
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 2cdc19d3c93..5b769dd1f62 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -49,7 +49,6 @@
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -70,10 +69,6 @@
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -190,7 +185,7 @@ public FlinkYarnSessionCli(
                query = new Option(shortPrefix + "q", longPrefix + "query", 
false, "Display available YARN resources (memory, cores)");
                applicationId = new Option(shortPrefix + "id", longPrefix + 
"applicationId", true, "Attach to running YARN session");
                queue = new Option(shortPrefix + "qu", longPrefix + "queue", 
true, "Specify YARN queue.");
-               shipPath = new Option(shortPrefix + "t", longPrefix + "ship", 
true, "Ship files in the specified directory (t for transfer)");
+               shipPath = new Option(shortPrefix + "t", longPrefix + "ship", 
true, "Ship files in the specified directory, multi directory split them with 
comma (t for transfer)");
                flinkJar = new Option(shortPrefix + "j", longPrefix + "jar", 
true, "Path to Flink jar file");
                jmMemory = new Option(shortPrefix + "jm", longPrefix + 
"jobManagerMemory", true, "Memory for JobManager Container [in MB]");
                tmMemory = new Option(shortPrefix + "tm", longPrefix + 
"taskManagerMemory", true, "Memory per TaskManager Container [in MB]");
@@ -276,98 +271,6 @@ private AbstractYarnClusterDescriptor createDescriptor(
                        yarnConfiguration,
                        configurationDirectory);
 
-               // Jar Path
-               final Path localJarPath;
-               if (cmd.hasOption(flinkJar.getOpt())) {
-                       String userPath = cmd.getOptionValue(flinkJar.getOpt());
-                       if (!userPath.startsWith("file://")) {
-                               userPath = "file://" + userPath;
-                       }
-                       localJarPath = new Path(userPath);
-               } else {
-                       LOG.info("No path for the flink jar passed. Using the 
location of "
-                               + yarnClusterDescriptor.getClass() + " to 
locate the jar");
-                       String encodedJarPath =
-                               
yarnClusterDescriptor.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
-
-                       final String decodedPath;
-                       try {
-                               // we have to decode the url encoded parts of 
the path
-                               decodedPath = URLDecoder.decode(encodedJarPath, 
Charset.defaultCharset().name());
-                       } catch (UnsupportedEncodingException e) {
-                               throw new RuntimeException("Couldn't decode the 
encoded Flink dist jar path: " + encodedJarPath +
-                                       " Please supply a path manually via the 
-" + flinkJar.getOpt() + " option.");
-                       }
-
-                       // check whether it's actually a jar file --> when 
testing we execute this class without a flink-dist jar
-                       if (decodedPath.endsWith(".jar")) {
-                               localJarPath = new Path(new 
File(decodedPath).toURI());
-                       } else {
-                               localJarPath = null;
-                       }
-               }
-
-               if (localJarPath != null) {
-                       yarnClusterDescriptor.setLocalJarPath(localJarPath);
-               }
-
-               List<File> shipFiles = new ArrayList<>();
-               // path to directory to ship
-               if (cmd.hasOption(shipPath.getOpt())) {
-                       String shipPath = 
cmd.getOptionValue(this.shipPath.getOpt());
-                       File shipDir = new File(shipPath);
-                       if (shipDir.isDirectory()) {
-                               shipFiles.add(shipDir);
-                       } else {
-                               LOG.warn("Ship directory is not a directory. 
Ignoring it.");
-                       }
-               }
-
-               yarnClusterDescriptor.addShipFiles(shipFiles);
-
-               // queue
-               if (cmd.hasOption(queue.getOpt())) {
-                       
yarnClusterDescriptor.setQueue(cmd.getOptionValue(queue.getOpt()));
-               }
-
-               final Properties properties = 
cmd.getOptionProperties(dynamicproperties.getOpt());
-
-               String[] dynamicProperties = 
properties.stringPropertyNames().stream()
-                       .flatMap(
-                               (String key) -> {
-                                       final String value = 
properties.getProperty(key);
-
-                                       if (value != null) {
-                                               return Stream.of(key + 
dynamicproperties.getValueSeparator() + value);
-                                       } else {
-                                               return Stream.empty();
-                                       }
-                               })
-                       .toArray(String[]::new);
-
-               String dynamicPropertiesEncoded = 
StringUtils.join(dynamicProperties, YARN_DYNAMIC_PROPERTIES_SEPARATOR);
-
-               
yarnClusterDescriptor.setDynamicPropertiesEncoded(dynamicPropertiesEncoded);
-
-               if (cmd.hasOption(detached.getOpt()) || 
cmd.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt())) {
-                       this.detachedMode = true;
-                       yarnClusterDescriptor.setDetachedMode(true);
-               }
-
-               if (cmd.hasOption(name.getOpt())) {
-                       
yarnClusterDescriptor.setName(cmd.getOptionValue(name.getOpt()));
-               } else {
-                       // set the default application name, if none is 
specified
-                       if (defaultApplicationName != null) {
-                               
yarnClusterDescriptor.setName(defaultApplicationName);
-                       }
-               }
-
-               if (cmd.hasOption(zookeeperNamespace.getOpt())) {
-                       String zookeeperNamespaceValue = 
cmd.getOptionValue(this.zookeeperNamespace.getOpt());
-                       
yarnClusterDescriptor.setZookeeperNamespace(zookeeperNamespaceValue);
-               }
-
                return yarnClusterDescriptor;
        }
 
@@ -491,14 +394,12 @@ protected Configuration 
applyCommandLineOptionsToConfiguration(CommandLine comma
 
                final ApplicationId applicationId = getClusterId(commandLine);
 
-               if (applicationId != null) {
-                       final String zooKeeperNamespace;
-                       if (commandLine.hasOption(zookeeperNamespace.getOpt())){
-                               zooKeeperNamespace = 
commandLine.getOptionValue(zookeeperNamespace.getOpt());
-                       } else {
-                               zooKeeperNamespace = 
effectiveConfiguration.getString(HA_CLUSTER_ID, applicationId.toString());
-                       }
-
+               final String zooKeeperNamespace;
+               if (commandLine.hasOption(zookeeperNamespace.getOpt())) {
+                       zooKeeperNamespace = 
commandLine.getOptionValue(zookeeperNamespace.getOpt());
+                       effectiveConfiguration.setString(HA_CLUSTER_ID, 
zooKeeperNamespace);
+               } else if (applicationId != null) {
+                       zooKeeperNamespace = 
effectiveConfiguration.getString(HA_CLUSTER_ID, applicationId.toString());
                        effectiveConfiguration.setString(HA_CLUSTER_ID, 
zooKeeperNamespace);
                }
 
@@ -514,6 +415,49 @@ protected Configuration 
applyCommandLineOptionsToConfiguration(CommandLine comma
                        
effectiveConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
Integer.parseInt(commandLine.getOptionValue(slots.getOpt())));
                }
 
+               // queue
+               if (commandLine.hasOption(queue.getOpt())) {
+                       
effectiveConfiguration.setString(YarnConfigOptions.YARN_QUEUE, 
commandLine.getOptionValue(queue.getOpt()));
+               }
+
+               if (commandLine.hasOption(detached.getOpt()) || 
commandLine.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt())) {
+                       this.detachedMode = true;
+                       
effectiveConfiguration.setBoolean(YarnConfigOptions.DETACHED_MODE, true);
+               }
+
+               final Properties properties = 
commandLine.getOptionProperties(dynamicproperties.getOpt());
+
+               if (properties != null) {
+                       String[] dynamicProperties = 
properties.stringPropertyNames().stream()
+                               .flatMap(
+                                       (String key) -> {
+                                               final String value = 
properties.getProperty(key);
+
+                                               if (value != null) {
+                                                       return Stream.of(key + 
dynamicproperties.getValueSeparator() + value);
+                                               } else {
+                                                       return Stream.empty();
+                                               }
+                                       })
+                               .toArray(String[]::new);
+
+                       String dynamicPropertiesEncoded = 
StringUtils.join(dynamicProperties, YARN_DYNAMIC_PROPERTIES_SEPARATOR);
+
+                       
effectiveConfiguration.setString(YarnConfigOptions.DYNAMIC_PROPERTIES_ENCODED, 
dynamicPropertiesEncoded);
+               }
+
+               if (commandLine.hasOption(flinkJar.getOpt())) {
+                       
effectiveConfiguration.setString(YarnConfigOptions.FLINK_JAR, 
commandLine.getOptionValue(flinkJar.getOpt()));
+               }
+
+               if (commandLine.hasOption(name.getOpt())) {
+                       
effectiveConfiguration.setString(YarnConfigOptions.YARN_APPLICATION_NAME, 
commandLine.getOptionValue(name.getOpt()));
+               }
+
+               if (commandLine.hasOption(shipPath.getOpt())) {
+                       
effectiveConfiguration.setString(YarnConfigOptions.YARN_SHIP_PATHS, 
commandLine.getOptionValue(this.shipPath.getOpt()));
+               }
+
                if (isYarnPropertiesFileMode(commandLine)) {
                        return applyYarnProperties(effectiveConfiguration);
                } else {
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index 255a6c71f8a..a92cbd615e4 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -142,6 +142,54 @@
                .defaultValue("")
                .withDescription("A comma-separated list of tags to apply to 
the Flink YARN application.");
 
+       /**
+        * A command option to specify the application will submit to which 
YARN queue.
+        */
+       public static final ConfigOption<String> YARN_QUEUE =
+               key("yarn.queue")
+               .noDefaultValue()
+               .withDescription("A command option to specify the application 
will submit to which YARN queue.");
+
+       /**
+        * A command option to specify whether the application submission uses 
detached mode or not.
+        */
+       public static final ConfigOption<Boolean> DETACHED_MODE =
+               key("yarn.detached-mode")
+               .defaultValue(false)
+               .withDescription("A command option to specify whether the 
application submission uses detached mode or not.");
+
+       /**
+        * A command option to specify the encoded dynamic properties.
+        */
+       public static final ConfigOption<String> DYNAMIC_PROPERTIES_ENCODED =
+               key("yarn.dynamic-properties-encoded")
+               .noDefaultValue()
+               .withDescription("A command option to specify the encoded 
dynamic properties.");
+
+       /**
+        * A command option to specify the flink jar path.
+        */
+       public static final ConfigOption<String> FLINK_JAR =
+               key("yarn.flink-jar")
+               .noDefaultValue()
+               .withDescription("A command option to specify the flink jar 
path.");
+
+       /**
+        * A command option to specify the YARN flink application name.
+        */
+       public static final ConfigOption<String> YARN_APPLICATION_NAME =
+               key("yarn.application-name")
+               .noDefaultValue()
+               .withDescription("A command option to specify the YARN flink 
application name.");
+
+       /**
+        * A command option which contains comma-separated list of ship paths.
+        */
+       public static final ConfigOption<String> YARN_SHIP_PATHS =
+               key("yarn.ship-paths")
+               .noDefaultValue()
+               .withDescription("A comma-separated list of ship paths.");
+
        // 
------------------------------------------------------------------------
 
        /** This class is not meant to be instantiated. */
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index 20ce314399f..c66e2180dd4 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -137,6 +137,42 @@ public void testZookeeperNamespaceProperty() throws 
Exception {
                assertEquals(zkNamespaceCliInput, 
descriptor.getZookeeperNamespace());
        }
 
+       @Test
+       public void testNameProperty() throws Exception {
+               String testName = "testFlinkApplication";
+               String[] params = new String[] {"-yn", "2", "-ynm", testName};
+
+               FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli(
+                       new Configuration(),
+                       tmp.getRoot().getAbsolutePath(),
+                       "y",
+                       "yarn");
+
+               CommandLine commandLine = 
yarnCLI.parseCommandLineOptions(params, true);
+
+               AbstractYarnClusterDescriptor descriptor = 
yarnCLI.createClusterDescriptor(commandLine);
+
+               assertEquals(testName, descriptor.getCustomName());
+       }
+
+       @Test
+       public void testFlinkJarPathProperty() throws Exception {
+               String testFlinkJarPathCliInput = "/path/to/flink.jar";
+               String[] params = new String[] {"-yn", "2", "-yj", 
testFlinkJarPathCliInput};
+
+               FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli(
+                       new Configuration(),
+                       tmp.getRoot().getAbsolutePath(),
+                       "y",
+                       "yarn");
+
+               CommandLine commandLine = 
yarnCLI.parseCommandLineOptions(params, true);
+
+               AbstractYarnClusterDescriptor descriptor = 
yarnCLI.createClusterDescriptor(commandLine);
+
+               assertEquals(testFlinkJarPathCliInput, new 
File(descriptor.getFlinkJarPath().toUri()).getAbsolutePath());
+       }
+
        /**
         * Test that the CliFrontend is able to pick up the .yarn-properties 
file from a specified location.
         */
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index dd8b62536c2..9abdb040f1c 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -29,7 +29,6 @@
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.client.api.YarnClient;
@@ -44,10 +43,8 @@
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -90,16 +87,18 @@ public static void tearDownClass() {
 
        @Test
        public void testFailIfTaskSlotsHigherThanMaxVcores() throws 
ClusterDeploymentException {
+               final YarnClient yarnClient = YarnClient.createYarnClient();
+
+               Configuration flinkConfig = new Configuration();
+               flinkConfig.setString(YarnConfigOptions.FLINK_JAR, 
flinkJar.getPath());
 
                YarnClusterDescriptor clusterDescriptor = new 
YarnClusterDescriptor(
-                       new Configuration(),
+                       flinkConfig,
                        yarnConfiguration,
                        temporaryFolder.getRoot().getAbsolutePath(),
                        yarnClient,
                        true);
 
-               clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath()));
-
                ClusterSpecification clusterSpecification = new 
ClusterSpecification.ClusterSpecificationBuilder()
                        .setMasterMemoryMB(-1)
                        .setTaskManagerMemoryMB(-1)
@@ -126,6 +125,7 @@ public void testConfigOverwrite() throws 
ClusterDeploymentException {
                Configuration configuration = new Configuration();
                // overwrite vcores in config
                configuration.setInteger(YarnConfigOptions.VCORES, 
Integer.MAX_VALUE);
+               configuration.setString(YarnConfigOptions.FLINK_JAR, 
flinkJar.getPath());
 
                YarnClusterDescriptor clusterDescriptor = new 
YarnClusterDescriptor(
                        configuration,
@@ -134,8 +134,6 @@ public void testConfigOverwrite() throws 
ClusterDeploymentException {
                        yarnClient,
                        true);
 
-               clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath()));
-
                // configure slots
                ClusterSpecification clusterSpecification = new 
ClusterSpecification.ClusterSpecificationBuilder()
                        .setMasterMemoryMB(-1)
@@ -412,28 +410,38 @@ public void testSetupApplicationMasterContainer() {
         */
        @Test
        public void testExplicitLibShipping() throws Exception {
+               Configuration flinkConfiguration = new Configuration();
+
+               File libFile = temporaryFolder.newFile("libFile.jar");
+               File libFolder = temporaryFolder.newFolder().getAbsoluteFile();
+
                AbstractYarnClusterDescriptor descriptor = new 
YarnClusterDescriptor(
-                       new Configuration(),
+                       flinkConfiguration,
                        yarnConfiguration,
                        temporaryFolder.getRoot().getAbsolutePath(),
                        yarnClient,
                        true);
 
                try {
-                       descriptor.setLocalJarPath(new 
Path("/path/to/flink.jar"));
-
-                       File libFile = temporaryFolder.newFile("libFile.jar");
-                       File libFolder = 
temporaryFolder.newFolder().getAbsoluteFile();
-
                        
Assert.assertFalse(descriptor.shipFiles.contains(libFile));
                        
Assert.assertFalse(descriptor.shipFiles.contains(libFolder));
+               } finally {
+                       descriptor.close();
+               }
 
-                       List<File> shipFiles = new ArrayList<>();
-                       shipFiles.add(libFile);
-                       shipFiles.add(libFolder);
+               flinkConfiguration = new Configuration();
+               flinkConfiguration.setString(YarnConfigOptions.FLINK_JAR, 
"/path/to/flink.jar");
+               String shipFilePaths = libFile.getAbsolutePath() + "," + 
libFolder.getAbsolutePath();
+               flinkConfiguration.setString(YarnConfigOptions.YARN_SHIP_PATHS, 
shipFilePaths);
 
-                       descriptor.addShipFiles(shipFiles);
+               descriptor = new YarnClusterDescriptor(
+                       flinkConfiguration,
+                       yarnConfiguration,
+                       temporaryFolder.getRoot().getAbsolutePath(),
+                       yarnClient,
+                       true);
 
+               try {
                        
Assert.assertTrue(descriptor.shipFiles.contains(libFile));
                        
Assert.assertTrue(descriptor.shipFiles.contains(libFolder));
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to