Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5584#discussion_r170922634
  
    --- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
    @@ -479,36 +479,60 @@ public void terminateCluster(ApplicationId 
applicationId) throws FlinkException
                        ClusterEntrypoint.ExecutionMode.DETACHED
                        : ClusterEntrypoint.ExecutionMode.NORMAL;
     
    -           flinkConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE, 
executionMode.toString());
    +           
effectiveConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE, 
executionMode.toString());
    +
    +           final ApplicationId appId = 
yarnApplication.getApplicationSubmissionContext().getApplicationId();
     
    -           ApplicationReport report = startAppMaster(
    -                   new Configuration(flinkConfiguration),
    +           // --------- Add Zookeeper namespace to effectiveConfiguration 
---------
    +           String zkNamespace = getZookeeperNamespace();
    +           // no user specified cli argument for namespace?
    +           if (zkNamespace == null || zkNamespace.isEmpty()) {
    +                   // namespace defined in config? else use applicationId 
as default.
    +                   zkNamespace = 
flinkConfiguration.getString(HighAvailabilityOptions.HA_CLUSTER_ID, 
appId.toString());
    +                   setZookeeperNamespace(zkNamespace);
    +           }
    +           
effectiveConfiguration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, 
zkNamespace);
    --- End diff --
    
    This would then mean that we only write the `appId` as the `HA_CLUSTER_ID` 
if there is no value set.


---

Reply via email to