This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 46514f38a2e364da30a96444ece45eceb1e09762
Author: Till Rohrmann <[email protected]>
AuthorDate: Thu Jan 7 11:02:20 2021 +0100

    [FLINK-20866][yarn] Set high-availability.cluster-id to application id if 
not configured
    
    In order to easily connect to an HA enabled Yarn cluster, this commit sets 
the
    high-availability.cluster-id to the application id if not configured. This 
is
    symmetric to how we deploy HA enabled clusters and makes it unnecessary to
    explicitly set the high-availability.cluster-id if one tries to reconnect
    to the cluster.
    
    This closes #14577.
---
 docs/deployment/resource-providers/yarn.md         |  6 +++-
 docs/deployment/resource-providers/yarn.zh.md      |  6 +++-
 .../flink/yarn/YARNHighAvailabilityITCase.java     | 36 ++++++++++++++++++++++
 .../apache/flink/yarn/YarnClusterDescriptor.java   | 13 +++++---
 4 files changed, 55 insertions(+), 6 deletions(-)

diff --git a/docs/deployment/resource-providers/yarn.md 
b/docs/deployment/resource-providers/yarn.md
index 759d203..3d8837d 100644
--- a/docs/deployment/resource-providers/yarn.md
+++ b/docs/deployment/resource-providers/yarn.md
@@ -194,7 +194,11 @@ Once a HA service is configured, it will persist 
JobManager metadata and perform
 
 YARN is taking care of restarting failed JobManagers. The maximum number of 
JobManager restarts is defined through two configuration parameters. First 
Flink's [yarn.application-attempts]({% link deployment/config.md 
%}#yarn-application-attempts) configuration will default 2. This value is 
limited by YARN's 
[yarn.resourcemanager.am.max-attempts](https://hadoop.apache.org/docs/r2.4.1/hadoop-yarn/hadoop-yarn-common/yarn-default.xml),
 which also defaults to 2.
 
-Note that Flink is managing the `high-availability.cluster-id` configuration 
parameter when running on YARN. **You should not overwrite this parameter when 
running an HA cluster on YARN**. The cluster ID is used to distinguish multiple 
HA clusters in the HA backend (for example Zookeeper). Overwriting this 
configuration parameter can lead to multiple YARN clusters affecting each other.
+Note that Flink is managing the `high-availability.cluster-id` configuration 
parameter when deploying on YARN.
+Flink sets it per default to the YARN application id.
+**You should not overwrite this parameter when deploying an HA cluster on 
YARN**. 
+The cluster ID is used to distinguish multiple HA clusters in the HA backend 
(for example Zookeeper). 
+Overwriting this configuration parameter can lead to multiple YARN clusters 
affecting each other.
 
 #### Container Shutdown Behaviour
 
diff --git a/docs/deployment/resource-providers/yarn.zh.md 
b/docs/deployment/resource-providers/yarn.zh.md
index 0cb589b..72e4505 100644
--- a/docs/deployment/resource-providers/yarn.zh.md
+++ b/docs/deployment/resource-providers/yarn.zh.md
@@ -194,7 +194,11 @@ Once a HA service is configured, it will persist 
JobManager metadata and perform
 
 YARN is taking care of restarting failed JobManagers. The maximum number of 
JobManager restarts is defined through two configuration parameters. First 
Flink's [yarn.application-attempts]({% link deployment/config.zh.md 
%}#yarn-application-attempts) configuration will default 2. This value is 
limited by YARN's 
[yarn.resourcemanager.am.max-attempts](https://hadoop.apache.org/docs/r2.4.1/hadoop-yarn/hadoop-yarn-common/yarn-default.xml),
 which also defaults to 2.
 
-Note that Flink is managing the `high-availability.cluster-id` configuration 
parameter when running on YARN. **You should not overwrite this parameter when 
running an HA cluster on YARN**. The cluster ID is used to distinguish multiple 
HA clusters in the HA backend (for example Zookeeper). Overwriting this 
configuration parameter can lead to multiple YARN clusters affecting each other.
+Note that Flink is managing the `high-availability.cluster-id` configuration 
parameter when deploying on YARN.
+Flink sets it per default to the YARN application id.
+**You should not overwrite this parameter when deploying an HA cluster on 
YARN**.
+The cluster ID is used to distinguish multiple HA clusters in the HA backend 
(for example Zookeeper).
+Overwriting this configuration parameter can lead to multiple YARN clusters 
affecting each other.
 
 #### Container Shutdown Behaviour
 
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 23a8aa2..eeed605 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -87,6 +87,7 @@ import java.util.function.Function;
 import java.util.function.Predicate;
 
 import static org.apache.flink.util.Preconditions.checkState;
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
@@ -209,6 +210,41 @@ public class YARNHighAvailabilityITCase extends 
YarnTestBase {
                 });
     }
 
+    /**
+     * Tests that we can retrieve an HA enabled cluster by only specifying the 
application id if no
+     * other high-availability.cluster-id has been configured. See FLINK-20866.
+     */
+    @Test
+    public void testClusterClientRetrieval() throws Exception {
+        runTest(
+                () -> {
+                    final YarnClusterDescriptor yarnClusterDescriptor =
+                            setupYarnClusterDescriptor();
+                    final RestClusterClient<ApplicationId> restClusterClient =
+                            deploySessionCluster(yarnClusterDescriptor);
+
+                    ClusterClient<ApplicationId> newClusterClient = null;
+                    try {
+                        final ApplicationId clusterId = 
restClusterClient.getClusterId();
+
+                        final YarnClusterDescriptor newClusterDescriptor =
+                                setupYarnClusterDescriptor();
+                        newClusterClient =
+                                
newClusterDescriptor.retrieve(clusterId).getClusterClient();
+
+                        assertThat(newClusterClient.listJobs().join(), 
is(empty()));
+
+                        newClusterClient.shutDownCluster();
+                    } finally {
+                        restClusterClient.close();
+
+                        if (newClusterClient != null) {
+                            newClusterClient.close();
+                        }
+                    }
+                });
+    }
+
     private void waitForApplicationAttempt(final ApplicationId applicationId, 
final int attemptId)
             throws Exception {
         final YarnClient yarnClient = getYarnClient();
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index f07f8e1..e2b373a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -1767,11 +1767,11 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
     private void setClusterEntrypointInfoToConfig(final ApplicationReport 
report) {
         checkNotNull(report);
 
-        final ApplicationId clusterId = report.getApplicationId();
+        final ApplicationId appId = report.getApplicationId();
         final String host = report.getHost();
         final int port = report.getRpcPort();
 
-        LOG.info("Found Web Interface {}:{} of application '{}'.", host, port, 
clusterId);
+        LOG.info("Found Web Interface {}:{} of application '{}'.", host, port, 
appId);
 
         flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
         flinkConfiguration.setInteger(JobManagerOptions.PORT, port);
@@ -1779,8 +1779,13 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
         flinkConfiguration.setString(RestOptions.ADDRESS, host);
         flinkConfiguration.setInteger(RestOptions.PORT, port);
 
-        flinkConfiguration.set(
-                YarnConfigOptions.APPLICATION_ID, 
ConverterUtils.toString(clusterId));
+        flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID, 
ConverterUtils.toString(appId));
+
+        // set cluster-id to app id if not specified
+        if 
(!flinkConfiguration.contains(HighAvailabilityOptions.HA_CLUSTER_ID)) {
+            flinkConfiguration.set(
+                    HighAvailabilityOptions.HA_CLUSTER_ID, 
ConverterUtils.toString(appId));
+        }
     }
 
     public static void logDetachedClusterInformation(

Reply via email to