Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5584#discussion_r170922634
--- Diff:
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
---
@@ -479,36 +479,60 @@ public void terminateCluster(ApplicationId
applicationId) throws FlinkException
ClusterEntrypoint.ExecutionMode.DETACHED
: ClusterEntrypoint.ExecutionMode.NORMAL;
- flinkConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE,
executionMode.toString());
+
effectiveConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE,
executionMode.toString());
+
+ final ApplicationId appId =
yarnApplication.getApplicationSubmissionContext().getApplicationId();
- ApplicationReport report = startAppMaster(
- new Configuration(flinkConfiguration),
+ // --------- Add Zookeeper namespace to effectiveConfiguration
---------
+ String zkNamespace = getZookeeperNamespace();
+ // no user specified cli argument for namespace?
+ if (zkNamespace == null || zkNamespace.isEmpty()) {
+ // namespace defined in config? else use applicationId
as default.
+ zkNamespace =
flinkConfiguration.getString(HighAvailabilityOptions.HA_CLUSTER_ID,
appId.toString());
+ setZookeeperNamespace(zkNamespace);
+ }
+
effectiveConfiguration.setString(HighAvailabilityOptions.HA_CLUSTER_ID,
zkNamespace);
--- End diff --
This would then mean that we only write the `appId` as the `HA_CLUSTER_ID`
if there is no value set.
---