Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5584#discussion_r170921732
--- Diff:
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
---
@@ -479,36 +479,60 @@ public void terminateCluster(ApplicationId
applicationId) throws FlinkException
ClusterEntrypoint.ExecutionMode.DETACHED
: ClusterEntrypoint.ExecutionMode.NORMAL;
- flinkConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE,
executionMode.toString());
+
effectiveConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE,
executionMode.toString());
+
+ final ApplicationId appId =
yarnApplication.getApplicationSubmissionContext().getApplicationId();
- ApplicationReport report = startAppMaster(
- new Configuration(flinkConfiguration),
+ // --------- Add Zookeeper namespace to effectiveConfiguration
---------
+ String zkNamespace = getZookeeperNamespace();
+ // no user specified cli argument for namespace?
+ if (zkNamespace == null || zkNamespace.isEmpty()) {
+ // namespace defined in config? else use applicationId
as default.
+ zkNamespace =
flinkConfiguration.getString(HighAvailabilityOptions.HA_CLUSTER_ID,
appId.toString());
+ setZookeeperNamespace(zkNamespace);
+ }
+
effectiveConfiguration.setString(HighAvailabilityOptions.HA_CLUSTER_ID,
zkNamespace);
+
+ effectiveConfiguration.setInteger(
+ ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
+ clusterSpecification.getSlotsPerTaskManager());
+
+ // write out configuration file so that it can be uploaded
+ File tmpConfigurationFile = File.createTempFile(appId +
"-flink-conf.yaml", null);
+ tmpConfigurationFile.deleteOnExit();
+ BootstrapTools.writeConfiguration(effectiveConfiguration,
tmpConfigurationFile);
--- End diff --
Why do we pull the writing of the configuration out of `startAppMaster`?
This belongs somewhat to the `startAppMaster` method, because then it could
also be cleaned up there after starting the application master.
---