antonipp commented on code in PR #514:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/514#discussion_r1088951027
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##########
@@ -160,19 +163,31 @@ public KubernetesClient getKubernetesClient() {
return kubernetesClient;
}
+ public CuratorFrameworkWithUnhandledErrorListener getCurator(Configuration
conf) {
+ return ZooKeeperUtils.startCuratorFramework(conf, exception -> {});
+ }
+
@Override
public void submitApplicationCluster(
JobSpec jobSpec, Configuration conf, boolean requireHaMetadata)
throws Exception {
LOG.info(
"Deploying application cluster{}",
requireHaMetadata ? " requiring last-state from HA metadata" :
"");
+
+ // If Kubernetes or Zookeeper HA are activated, delete the job graph
in HA storage so that
+ // the newly changed job config (e.g. parallelism) could take effect
if (FlinkUtils.isKubernetesHAActivated(conf)) {
final String clusterId =
conf.get(KubernetesConfigOptions.CLUSTER_ID);
final String namespace =
conf.get(KubernetesConfigOptions.NAMESPACE);
- // Delete the job graph in the HA ConfigMaps so that the newly
changed job config(e.g.
- // parallelism) could take effect
FlinkUtils.deleteJobGraphInKubernetesHA(clusterId, namespace,
kubernetesClient);
+ } else if (FlinkUtils.isZookeeperHAActivated(conf)) {
+ try (var curator = getCurator(conf)) {
+ ZooKeeperUtils.deleteZNode(
Review Comment:
Yeah makes sense, then even more arguments for not doing this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]