This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 46514f38a2e364da30a96444ece45eceb1e09762 Author: Till Rohrmann <[email protected]> AuthorDate: Thu Jan 7 11:02:20 2021 +0100 [FLINK-20866][yarn] Set high-availability.cluster-id to application id if not configured In order to easily connect to an HA enabled Yarn cluster, this commit sets the high-availability.cluster-id to the application id if not configured. This is symmetric to how we deploy HA enabled clusters and makes it unnecessary to explicitly set the high-availability.cluster-id if one tries to reconnect to the cluster. This closes #14577. --- docs/deployment/resource-providers/yarn.md | 6 +++- docs/deployment/resource-providers/yarn.zh.md | 6 +++- .../flink/yarn/YARNHighAvailabilityITCase.java | 36 ++++++++++++++++++++++ .../apache/flink/yarn/YarnClusterDescriptor.java | 13 +++++--- 4 files changed, 55 insertions(+), 6 deletions(-) diff --git a/docs/deployment/resource-providers/yarn.md b/docs/deployment/resource-providers/yarn.md index 759d203..3d8837d 100644 --- a/docs/deployment/resource-providers/yarn.md +++ b/docs/deployment/resource-providers/yarn.md @@ -194,7 +194,11 @@ Once a HA service is configured, it will persist JobManager metadata and perform YARN is taking care of restarting failed JobManagers. The maximum number of JobManager restarts is defined through two configuration parameters. First Flink's [yarn.application-attempts]({% link deployment/config.md %}#yarn-application-attempts) configuration will default 2. This value is limited by YARN's [yarn.resourcemanager.am.max-attempts](https://hadoop.apache.org/docs/r2.4.1/hadoop-yarn/hadoop-yarn-common/yarn-default.xml), which also defaults to 2. -Note that Flink is managing the `high-availability.cluster-id` configuration parameter when running on YARN. **You should not overwrite this parameter when running an HA cluster on YARN**. The cluster ID is used to distinguish multiple HA clusters in the HA backend (for example Zookeeper). Overwriting this configuration parameter can lead to multiple YARN clusters affecting each other. +Note that Flink is managing the `high-availability.cluster-id` configuration parameter when deploying on YARN. +Flink sets it per default to the YARN application id. +**You should not overwrite this parameter when deploying an HA cluster on YARN**. +The cluster ID is used to distinguish multiple HA clusters in the HA backend (for example Zookeeper). +Overwriting this configuration parameter can lead to multiple YARN clusters affecting each other. #### Container Shutdown Behaviour diff --git a/docs/deployment/resource-providers/yarn.zh.md b/docs/deployment/resource-providers/yarn.zh.md index 0cb589b..72e4505 100644 --- a/docs/deployment/resource-providers/yarn.zh.md +++ b/docs/deployment/resource-providers/yarn.zh.md @@ -194,7 +194,11 @@ Once a HA service is configured, it will persist JobManager metadata and perform YARN is taking care of restarting failed JobManagers. The maximum number of JobManager restarts is defined through two configuration parameters. First Flink's [yarn.application-attempts]({% link deployment/config.zh.md %}#yarn-application-attempts) configuration will default 2. This value is limited by YARN's [yarn.resourcemanager.am.max-attempts](https://hadoop.apache.org/docs/r2.4.1/hadoop-yarn/hadoop-yarn-common/yarn-default.xml), which also defaults to 2. -Note that Flink is managing the `high-availability.cluster-id` configuration parameter when running on YARN. **You should not overwrite this parameter when running an HA cluster on YARN**. The cluster ID is used to distinguish multiple HA clusters in the HA backend (for example Zookeeper). Overwriting this configuration parameter can lead to multiple YARN clusters affecting each other. +Note that Flink is managing the `high-availability.cluster-id` configuration parameter when deploying on YARN. +Flink sets it per default to the YARN application id. +**You should not overwrite this parameter when deploying an HA cluster on YARN**. +The cluster ID is used to distinguish multiple HA clusters in the HA backend (for example Zookeeper). +Overwriting this configuration parameter can lead to multiple YARN clusters affecting each other. #### Container Shutdown Behaviour diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java index 23a8aa2..eeed605 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java @@ -87,6 +87,7 @@ import java.util.function.Function; import java.util.function.Predicate; import static org.apache.flink.util.Preconditions.checkState; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -209,6 +210,41 @@ public class YARNHighAvailabilityITCase extends YarnTestBase { }); } + /** + * Tests that we can retrieve an HA enabled cluster by only specifying the application id if no + * other high-availability.cluster-id has been configured. See FLINK-20866. + */ + @Test + public void testClusterClientRetrieval() throws Exception { + runTest( + () -> { + final YarnClusterDescriptor yarnClusterDescriptor = + setupYarnClusterDescriptor(); + final RestClusterClient<ApplicationId> restClusterClient = + deploySessionCluster(yarnClusterDescriptor); + + ClusterClient<ApplicationId> newClusterClient = null; + try { + final ApplicationId clusterId = restClusterClient.getClusterId(); + + final YarnClusterDescriptor newClusterDescriptor = + setupYarnClusterDescriptor(); + newClusterClient = + newClusterDescriptor.retrieve(clusterId).getClusterClient(); + + assertThat(newClusterClient.listJobs().join(), is(empty())); + + newClusterClient.shutDownCluster(); + } finally { + restClusterClient.close(); + + if (newClusterClient != null) { + newClusterClient.close(); + } + } + }); + } + private void waitForApplicationAttempt(final ApplicationId applicationId, final int attemptId) throws Exception { final YarnClient yarnClient = getYarnClient(); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index f07f8e1..e2b373a 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -1767,11 +1767,11 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { private void setClusterEntrypointInfoToConfig(final ApplicationReport report) { checkNotNull(report); - final ApplicationId clusterId = report.getApplicationId(); + final ApplicationId appId = report.getApplicationId(); final String host = report.getHost(); final int port = report.getRpcPort(); - LOG.info("Found Web Interface {}:{} of application '{}'.", host, port, clusterId); + LOG.info("Found Web Interface {}:{} of application '{}'.", host, port, appId); flinkConfiguration.setString(JobManagerOptions.ADDRESS, host); flinkConfiguration.setInteger(JobManagerOptions.PORT, port); @@ -1779,8 +1779,13 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { flinkConfiguration.setString(RestOptions.ADDRESS, host); flinkConfiguration.setInteger(RestOptions.PORT, port); - flinkConfiguration.set( - YarnConfigOptions.APPLICATION_ID, ConverterUtils.toString(clusterId)); + flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID, ConverterUtils.toString(appId)); + + // set cluster-id to app id if not specified + if (!flinkConfiguration.contains(HighAvailabilityOptions.HA_CLUSTER_ID)) { + flinkConfiguration.set( + HighAvailabilityOptions.HA_CLUSTER_ID, ConverterUtils.toString(appId)); + } } public static void logDetachedClusterInformation(
