antonipp commented on code in PR #514:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/514#discussion_r1087919678


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##########
@@ -160,19 +163,31 @@ public KubernetesClient getKubernetesClient() {
         return kubernetesClient;
     }
 
+    public CuratorFrameworkWithUnhandledErrorListener getCurator(Configuration 
conf) {

Review Comment:
   Wrapped ZK client creation in a separate method so that it's easier to mock 
once I get to unit tests



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -340,6 +341,11 @@ protected Configuration build() {
         // Set cluster config
         effectiveConfig.setString(KubernetesConfigOptions.NAMESPACE, 
namespace);
         effectiveConfig.setString(KubernetesConfigOptions.CLUSTER_ID, 
clusterId);
+
+        if (FlinkUtils.isZookeeperHAActivated(effectiveConfig)) {
+            effectiveConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID, 
clusterId);

Review Comment:
    By default, `clusterId` is generated [based on the FlinkDeployment resource 
name](https://github.com/antonipp/flink-kubernetes-operator/blob/6b0d91fe2c1dc41eeba655337c1d99fedabe3ef8/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java#L96).
   
   The value of `HighAvailabilityOptions.HA_CLUSTER_ID` is later 
[used](https://github.com/apache/flink/blob/c57b84921447bb0ade5e1ff77a05ebd8bbbe71b7/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java#L226)
 as the namespace for storing Zookeeper ZNodes for each Flink application.
   
   This means that if someone creates two `FlinkDeployment`s with the same name 
in two different Kubernetes namespaces and these two `FlinkDeployment`s use the 
same ZK cluster for HA metadata storage, there might be a collision. So I think 
we should append a random UUID to `HighAvailabilityOptions.HA_CLUSTER_ID` :
   ```java
   effectiveConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId + 
"-RANDOM-UUID-HERE");
   ```
   Not 100% sure of all the implications of doing that but I think it should be 
safe enough?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##########
@@ -160,19 +163,31 @@ public KubernetesClient getKubernetesClient() {
         return kubernetesClient;
     }
 
+    public CuratorFrameworkWithUnhandledErrorListener getCurator(Configuration 
conf) {
+        return ZooKeeperUtils.startCuratorFramework(conf, exception -> {});

Review Comment:
   Note that by leveraging the 
[ZooKeeperUtils::startCuratorFramework](https://github.com/apache/flink/blob/c57b84921447bb0ade5e1ff77a05ebd8bbbe71b7/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java#L200)
 function and by taking the application `conf`, we create the client in exactly 
the same way as the Flink application itself. This means that we should be able 
to support all ZK configuration settings that are supported by Flink 
applications themselves (ex: ACLs).



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##########
@@ -160,19 +163,31 @@ public KubernetesClient getKubernetesClient() {
         return kubernetesClient;
     }
 
+    public CuratorFrameworkWithUnhandledErrorListener getCurator(Configuration 
conf) {
+        return ZooKeeperUtils.startCuratorFramework(conf, exception -> {});
+    }
+
     @Override
     public void submitApplicationCluster(
             JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) 
throws Exception {
         LOG.info(
                 "Deploying application cluster{}",
                 requireHaMetadata ? " requiring last-state from HA metadata" : 
"");
+
+        // If Kubernetes or Zookeeper HA are activated, delete the job graph 
in HA storage so that
+        // the newly changed job config (e.g. parallelism) could take effect
         if (FlinkUtils.isKubernetesHAActivated(conf)) {
             final String clusterId = 
conf.get(KubernetesConfigOptions.CLUSTER_ID);
             final String namespace = 
conf.get(KubernetesConfigOptions.NAMESPACE);
-            // Delete the job graph in the HA ConfigMaps so that the newly 
changed job config(e.g.
-            // parallelism) could take effect
             FlinkUtils.deleteJobGraphInKubernetesHA(clusterId, namespace, 
kubernetesClient);
+        } else if (FlinkUtils.isZookeeperHAActivated(conf)) {
+            try (var curator = getCurator(conf)) {

Review Comment:
   Here, the ZK client is created only for one request and then torn down (in 
`FlinkUtils ::isZookeeperHaMetadataAvailable` it's the same thing). We could 
somehow cache these clients but I think it's not worth it. 
   
   First of all, these ZK requests happen quite rarely and the overhead of 
starting one client per request is negligible IMO.  
   
   Moreover, if we decide to cache the client, we'll need to cache one per 
Flink application. Since we use the application `conf` when instantiating the 
client, we would also need to re-create the client every time ZK configuration 
values change in the application `conf`, which adds even more unnecessary 
complexity.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##########
@@ -160,19 +163,31 @@ public KubernetesClient getKubernetesClient() {
         return kubernetesClient;
     }
 
+    public CuratorFrameworkWithUnhandledErrorListener getCurator(Configuration 
conf) {
+        return ZooKeeperUtils.startCuratorFramework(conf, exception -> {});
+    }
+
     @Override
     public void submitApplicationCluster(
             JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) 
throws Exception {
         LOG.info(
                 "Deploying application cluster{}",
                 requireHaMetadata ? " requiring last-state from HA metadata" : 
"");
+
+        // If Kubernetes or Zookeeper HA are activated, delete the job graph 
in HA storage so that
+        // the newly changed job config (e.g. parallelism) could take effect
         if (FlinkUtils.isKubernetesHAActivated(conf)) {
             final String clusterId = 
conf.get(KubernetesConfigOptions.CLUSTER_ID);
             final String namespace = 
conf.get(KubernetesConfigOptions.NAMESPACE);
-            // Delete the job graph in the HA ConfigMaps so that the newly 
changed job config(e.g.
-            // parallelism) could take effect
             FlinkUtils.deleteJobGraphInKubernetesHA(clusterId, namespace, 
kubernetesClient);
+        } else if (FlinkUtils.isZookeeperHAActivated(conf)) {
+            try (var curator = getCurator(conf)) {
+                ZooKeeperUtils.deleteZNode(

Review Comment:
   [This 
comment](https://issues.apache.org/jira/browse/FLINK-27273?focusedCommentId=17552240&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17552240)
 on the Jira ticket suggested using 
`HighAvailabilityServicesUtils::createAvailableOrEmbeddedServices`.
   
   I gave it a try with this piece of code (obviously not production ready but 
good enough for a POC):
   ```java
   try (var availableOrEmbeddedServices =
                       
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
                               conf, UnsupportedOperationExecutor.INSTANCE, 
exception -> {})) {
     availableOrEmbeddedServices
             .getJobGraphStore()
             .globalCleanupAsync(
                     
availableOrEmbeddedServices.getJobGraphStore().getJobIds().stream()
                             .findFirst()
                             .get(),
                     Executors.newFixedThreadPool(1));
   }
   ```
   
   The problem is that under the hood this will call 
`HighAvailabilityServicesUtils::createZooKeeperHaServices` which in turn will 
[instantiate](https://github.com/apache/flink/blob/fb05a7be9b828b7e582e75e4832443806fa4ff17/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java#L89)
 a `BlobStoreService` object. I immediately ran into issues with S3 FileSystem 
initialization since this is the file system we use for 
`high-availability.storageDir`. This is why I switched to 
`ZooKeeperUtils::startCuratorFramework` instead.
   
   As a side note, we could also authorize the Operator to access 
`high-availability.storageDir` and clean-up the data there as well (right now, 
both for Kubernetes HA and ZK HA, we only delete the pointer to the JobGraph 
from the HA metadata but we don't delete the JobGraph object itself). This adds 
a lot of complexity though, since we'll need a mechanism for adding relevant FS 
client dependencies and a mechanism for configuring these clients as well (i.e. 
specify credentials, retry options, etc).



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java:
##########
@@ -133,7 +136,23 @@ public static void deleteJobGraphInKubernetesHA(
         }
     }
 
-    public static boolean isHaMetadataAvailable(
+    public static boolean isZookeeperHaMetadataAvailable(
+            Configuration conf, CuratorFrameworkWithUnhandledErrorListener 
curator) {
+        try (curator) {
+            return curator.asCuratorFramework().checkExists().forPath("/") != 
null;

Review Comment:
   The ZK client is instantiated with 
`HighAvailabilityOptions.HA_ZOOKEEPER_ROOT / 
HighAvailabilityOptions.HA_CLUSTER_ID` as the namespace for all operations (set 
[here](https://github.com/apache/flink/blob/c57b84921447bb0ade5e1ff77a05ebd8bbbe71b7/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java#L267)).
   This code basically tests whether the root ZNode exists.
   
   Note that `checkExists` will return `null` if the path doesn't exist 
(http://www.mtitek.com/tutorials/zookeeper/cf_check_exists.php)



##########
docs/content/docs/concepts/overview.md:
##########
@@ -88,7 +88,7 @@ The examples are maintained as part of the operator repo and 
can be found [here]
 ## Known Issues & Limitations
 
 ### JobManager High-availability

Review Comment:
   TBH I don't know why this was in the "Known Issues & Limitations" section? 
Maybe we can move it somewhere else?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to