antonipp commented on code in PR #514:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/514#discussion_r1087919678
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##########
@@ -160,19 +163,31 @@ public KubernetesClient getKubernetesClient() {
return kubernetesClient;
}
+ public CuratorFrameworkWithUnhandledErrorListener getCurator(Configuration
conf) {
Review Comment:
Wrapped ZK client creation in a separate method so that it's easier to mock
once I get to unit tests
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -340,6 +341,11 @@ protected Configuration build() {
// Set cluster config
effectiveConfig.setString(KubernetesConfigOptions.NAMESPACE,
namespace);
effectiveConfig.setString(KubernetesConfigOptions.CLUSTER_ID,
clusterId);
+
+ if (FlinkUtils.isZookeeperHAActivated(effectiveConfig)) {
+ effectiveConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID,
clusterId);
Review Comment:
By default, `clusterId` is generated [based on the FlinkDeployment resource
name](https://github.com/antonipp/flink-kubernetes-operator/blob/6b0d91fe2c1dc41eeba655337c1d99fedabe3ef8/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java#L96).
The value of `HighAvailabilityOptions.HA_CLUSTER_ID` is later
[used](https://github.com/apache/flink/blob/c57b84921447bb0ade5e1ff77a05ebd8bbbe71b7/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java#L226)
as the namespace for storing Zookeeper ZNodes for each Flink application.
This means that if someone creates two `FlinkDeployment`s with the same name
in two different Kubernetes namespaces and these two `FlinkDeployment`s use the
same ZK cluster for HA metadata storage, there might be a collision. So I think
we should append a random UUID to `HighAvailabilityOptions.HA_CLUSTER_ID` :
```java
effectiveConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId +
"-RANDOM-UUID-HERE");
```
Not 100% sure of all the implications of doing that but I think it should be
safe enough?
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##########
@@ -160,19 +163,31 @@ public KubernetesClient getKubernetesClient() {
return kubernetesClient;
}
+ public CuratorFrameworkWithUnhandledErrorListener getCurator(Configuration
conf) {
+ return ZooKeeperUtils.startCuratorFramework(conf, exception -> {});
Review Comment:
Note that by leveraging the
[ZooKeeperUtils::startCuratorFramework](https://github.com/apache/flink/blob/c57b84921447bb0ade5e1ff77a05ebd8bbbe71b7/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java#L200)
function and by taking the application `conf`, we create the client in exactly
the same way as the Flink application itself. This means that we should be able
to support all ZK configuration settings that are supported by Flink
applications themselves (ex: ACLs).
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##########
@@ -160,19 +163,31 @@ public KubernetesClient getKubernetesClient() {
return kubernetesClient;
}
+ public CuratorFrameworkWithUnhandledErrorListener getCurator(Configuration
conf) {
+ return ZooKeeperUtils.startCuratorFramework(conf, exception -> {});
+ }
+
@Override
public void submitApplicationCluster(
JobSpec jobSpec, Configuration conf, boolean requireHaMetadata)
throws Exception {
LOG.info(
"Deploying application cluster{}",
requireHaMetadata ? " requiring last-state from HA metadata" :
"");
+
+ // If Kubernetes or Zookeeper HA are activated, delete the job graph
in HA storage so that
+ // the newly changed job config (e.g. parallelism) could take effect
if (FlinkUtils.isKubernetesHAActivated(conf)) {
final String clusterId =
conf.get(KubernetesConfigOptions.CLUSTER_ID);
final String namespace =
conf.get(KubernetesConfigOptions.NAMESPACE);
- // Delete the job graph in the HA ConfigMaps so that the newly
changed job config(e.g.
- // parallelism) could take effect
FlinkUtils.deleteJobGraphInKubernetesHA(clusterId, namespace,
kubernetesClient);
+ } else if (FlinkUtils.isZookeeperHAActivated(conf)) {
+ try (var curator = getCurator(conf)) {
Review Comment:
Here, the ZK client is created only for one request and then torn down (in
`FlinkUtils ::isZookeeperHaMetadataAvailable` it's the same thing). We could
somehow cache these clients but I think it's not worth it.
First of all, these ZK requests happen quite rarely and the overhead of
starting one client per request is negligible IMO.
Moreover, if we decide to cache the client, we'll need to cache one per
Flink application. Since we use the application `conf` when instantiating the
client, we would also need to re-create the client every time ZK configuration
values change in the application `conf`, which adds even more unnecessary
complexity.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##########
@@ -160,19 +163,31 @@ public KubernetesClient getKubernetesClient() {
return kubernetesClient;
}
+ public CuratorFrameworkWithUnhandledErrorListener getCurator(Configuration
conf) {
+ return ZooKeeperUtils.startCuratorFramework(conf, exception -> {});
+ }
+
@Override
public void submitApplicationCluster(
JobSpec jobSpec, Configuration conf, boolean requireHaMetadata)
throws Exception {
LOG.info(
"Deploying application cluster{}",
requireHaMetadata ? " requiring last-state from HA metadata" :
"");
+
+ // If Kubernetes or Zookeeper HA are activated, delete the job graph
in HA storage so that
+ // the newly changed job config (e.g. parallelism) could take effect
if (FlinkUtils.isKubernetesHAActivated(conf)) {
final String clusterId =
conf.get(KubernetesConfigOptions.CLUSTER_ID);
final String namespace =
conf.get(KubernetesConfigOptions.NAMESPACE);
- // Delete the job graph in the HA ConfigMaps so that the newly
changed job config(e.g.
- // parallelism) could take effect
FlinkUtils.deleteJobGraphInKubernetesHA(clusterId, namespace,
kubernetesClient);
+ } else if (FlinkUtils.isZookeeperHAActivated(conf)) {
+ try (var curator = getCurator(conf)) {
+ ZooKeeperUtils.deleteZNode(
Review Comment:
[This
comment](https://issues.apache.org/jira/browse/FLINK-27273?focusedCommentId=17552240&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17552240)
on the Jira ticket suggested using
`HighAvailabilityServicesUtils::createAvailableOrEmbeddedServices`.
I gave it a try with this piece of code (obviously not production ready but
good enough for a POC):
```java
try (var availableOrEmbeddedServices =
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
conf, UnsupportedOperationExecutor.INSTANCE,
exception -> {})) {
availableOrEmbeddedServices
.getJobGraphStore()
.globalCleanupAsync(
availableOrEmbeddedServices.getJobGraphStore().getJobIds().stream()
.findFirst()
.get(),
Executors.newFixedThreadPool(1));
}
```
The problem is that under the hood this will call
`HighAvailabilityServicesUtils::createZooKeeperHaServices` which in turn will
[instantiate](https://github.com/apache/flink/blob/fb05a7be9b828b7e582e75e4832443806fa4ff17/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java#L89)
a `BlobStoreService` object. I immediately ran into issues with S3 FileSystem
initialization since this is the file system we use for
`high-availability.storageDir`. This is why I switched to
`ZooKeeperUtils::startCuratorFramework` instead.
As a side note, we could also authorize the Operator to access
`high-availability.storageDir` and clean-up the data there as well (right now,
both for Kubernetes HA and ZK HA, we only delete the pointer to the JobGraph
from the HA metadata but we don't delete the JobGraph object itself). This adds
a lot of complexity though, since we'll need a mechanism for adding relevant FS
client dependencies and a mechanism for configuring these clients as well (i.e.
specify credentials, retry options, etc).
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java:
##########
@@ -133,7 +136,23 @@ public static void deleteJobGraphInKubernetesHA(
}
}
- public static boolean isHaMetadataAvailable(
+ public static boolean isZookeeperHaMetadataAvailable(
+ Configuration conf, CuratorFrameworkWithUnhandledErrorListener
curator) {
+ try (curator) {
+ return curator.asCuratorFramework().checkExists().forPath("/") !=
null;
Review Comment:
The ZK client is instantiated with
`HighAvailabilityOptions.HA_ZOOKEEPER_ROOT /
HighAvailabilityOptions.HA_CLUSTER_ID` as the namespace for all operations (set
[here](https://github.com/apache/flink/blob/c57b84921447bb0ade5e1ff77a05ebd8bbbe71b7/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java#L267)).
This code basically tests whether the root ZNode exists.
Note that `checkExists` will return `null` if the path doesn't exist
(http://www.mtitek.com/tutorials/zookeeper/cf_check_exists.php)
##########
docs/content/docs/concepts/overview.md:
##########
@@ -88,7 +88,7 @@ The examples are maintained as part of the operator repo and
can be found [here]
## Known Issues & Limitations
### JobManager High-availability
Review Comment:
TBH I don't know why this was in the "Known Issues & Limitations" section?
Maybe we can move it somewhere else?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]