This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 57c0716553b5c8d3c354175952dfcd5c804bdd3d
Author: Till Rohrmann <[email protected]>
AuthorDate: Thu Jan 7 12:39:59 2021 +0100

    [hotfix] Remove explicit YarnClusterDescriptor.zookeeperNamespace
    
    The YarnClusterDescriptor.zookeeperNamespace has been replaced by the 
configuration which
    is deployed together with the Yarn cluster. Hence, it is no longer needed.
---
 .../apache/flink/yarn/YarnClusterDescriptor.java   | 35 +++++-----------------
 .../java/org/apache/flink/yarn/YarnConfigKeys.java |  1 -
 .../flink/yarn/entrypoint/YarnEntrypointUtils.java |  7 -----
 .../apache/flink/yarn/FlinkYarnSessionCliTest.java |  6 ++--
 4 files changed, 9 insertions(+), 40 deletions(-)

diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index e2b373a..f31cfc1 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -153,8 +153,6 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
 
     private final String applicationType;
 
-    private String zookeeperNamespace;
-
     private YarnConfigOptions.UserJarInclusion userJarInclusion;
 
     public YarnClusterDescriptor(
@@ -183,10 +181,6 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
         this.customName = 
flinkConfiguration.getString(YarnConfigOptions.APPLICATION_NAME);
         this.applicationType = 
flinkConfiguration.getString(YarnConfigOptions.APPLICATION_TYPE);
         this.nodeLabel = 
flinkConfiguration.getString(YarnConfigOptions.NODE_LABEL);
-
-        // we want to ignore the default value at this point.
-        this.zookeeperNamespace =
-                
flinkConfiguration.getString(HighAvailabilityOptions.HA_CLUSTER_ID, null);
     }
 
     private Optional<List<File>> decodeFilesToShipToCluster(
@@ -358,14 +352,6 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
         }
     }
 
-    public String getZookeeperNamespace() {
-        return zookeeperNamespace;
-    }
-
-    private void setZookeeperNamespace(String zookeeperNamespace) {
-        this.zookeeperNamespace = zookeeperNamespace;
-    }
-
     public String getNodeLabel() {
         return nodeLabel;
     }
@@ -824,17 +810,7 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
         final ApplicationId appId = appContext.getApplicationId();
 
         // ------------------ Add Zookeeper namespace to local 
flinkConfiguraton ------
-        String zkNamespace = getZookeeperNamespace();
-        // no user specified cli argument for namespace?
-        if (zkNamespace == null || zkNamespace.isEmpty()) {
-            // namespace defined in config? else use applicationId as default.
-            zkNamespace =
-                    configuration.getString(
-                            HighAvailabilityOptions.HA_CLUSTER_ID, 
String.valueOf(appId));
-            setZookeeperNamespace(zkNamespace);
-        }
-
-        configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, 
zkNamespace);
+        setHAClusterIdIfNotSet(configuration, appId);
 
         if 
(HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
             // activate re-execution of failed applications
@@ -1133,7 +1109,6 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
                 YarnConfigKeys.ENV_CLIENT_SHIP_FILES,
                 encodeYarnLocalResourceDescriptorListToString(
                         fileUploader.getEnvShipResourceList()));
-        appMasterEnv.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE, 
getZookeeperNamespace());
         appMasterEnv.put(
                 YarnConfigKeys.FLINK_YARN_FILES,
                 fileUploader.getApplicationDir().toUri().toString());
@@ -1781,9 +1756,13 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
 
         flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID, 
ConverterUtils.toString(appId));
 
+        setHAClusterIdIfNotSet(flinkConfiguration, appId);
+    }
+
+    private void setHAClusterIdIfNotSet(Configuration configuration, 
ApplicationId appId) {
         // set cluster-id to app id if not specified
-        if 
(!flinkConfiguration.contains(HighAvailabilityOptions.HA_CLUSTER_ID)) {
-            flinkConfiguration.set(
+        if (!configuration.contains(HighAvailabilityOptions.HA_CLUSTER_ID)) {
+            configuration.set(
                     HighAvailabilityOptions.HA_CLUSTER_ID, 
ConverterUtils.toString(appId));
         }
     }
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
index fa5c0f0..96bf808 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
@@ -40,7 +40,6 @@ public class YarnConfigKeys {
     public static final String LOCAL_KEYTAB_PATH = "_LOCAL_KEYTAB_PATH";
     public static final String KEYTAB_PRINCIPAL = "_KEYTAB_PRINCIPAL";
     public static final String ENV_HADOOP_USER_NAME = "HADOOP_USER_NAME";
-    public static final String ENV_ZOOKEEPER_NAMESPACE = 
"_ZOOKEEPER_NAMESPACE";
 
     public static final String ENV_KRB5_PATH = "_KRB5_PATH";
     public static final String ENV_YARN_SITE_XML_PATH = "_YARN_SITE_XML_PATH";
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
index a013624..959459e 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
@@ -21,7 +21,6 @@ package org.apache.flink.yarn.entrypoint;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.configuration.RestOptions;
@@ -58,8 +57,6 @@ public class YarnEntrypointUtils {
 
         final String keytabPrincipal = 
env.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
 
-        final String zooKeeperNamespace = 
env.get(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE);
-
         final String hostname = 
env.get(ApplicationConstants.Environment.NM_HOST.key());
         Preconditions.checkState(
                 hostname != null,
@@ -69,10 +66,6 @@ public class YarnEntrypointUtils {
         configuration.setString(JobManagerOptions.ADDRESS, hostname);
         configuration.setString(RestOptions.ADDRESS, hostname);
 
-        if (zooKeeperNamespace != null) {
-            configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, 
zooKeeperNamespace);
-        }
-
         // if a web monitor shall be started, set the port to random binding
         if (configuration.getInteger(WebOptions.PORT, 0) >= 0) {
             configuration.setInteger(WebOptions.PORT, 0);
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index 9132cec..74c9f22 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -160,11 +160,9 @@ public class FlinkYarnSessionCliTest extends TestLogger {
         CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, 
true);
 
         Configuration executorConfig = yarnCLI.toConfiguration(commandLine);
-        ClusterClientFactory<ApplicationId> clientFactory = 
getClusterClientFactory(executorConfig);
-        YarnClusterDescriptor descriptor =
-                (YarnClusterDescriptor) 
clientFactory.createClusterDescriptor(executorConfig);
 
-        assertEquals(zkNamespaceCliInput, descriptor.getZookeeperNamespace());
+        assertThat(
+                executorConfig.get(HighAvailabilityOptions.HA_CLUSTER_ID), 
is(zkNamespaceCliInput));
     }
 
     @Test

Reply via email to