gaborgsomogyi commented on code in PR #501:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/501#discussion_r1062495719
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##########
@@ -184,8 +158,9 @@ protected void deploy(
throw new RuntimeException("This indicates a bug...");
}
LOG.info("Deleting deployment with terminated application before
new deployment");
-
flinkService.deleteClusterDeployment(relatedResource.getMetadata(), status,
true);
- flinkService.waitForClusterShutdown(deployConfig);
+ ctx.getFlinkService()
Review Comment:
Nit: We use `ctx.getFlinkService()` ~3 times, maybe var.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
+import
org.apache.flink.kubernetes.operator.metrics.KubernetesResourceMetricGroup;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+/** Context for reconciling a Flink resource. * */
+@RequiredArgsConstructor
+public abstract class FlinkResourceContext<CR extends AbstractFlinkResource<?,
?>> {
+
+ @Getter private final CR resource;
+ @Getter private final Context<?> josdkContext;
+ @Getter private final KubernetesResourceMetricGroup resourceMetricGroup;
+
+ private Configuration observeConfig;
+
+ /**
+ * Get the config that is currently deployed for the resource spec.
+ *
+ * @return Config currently deployed.
+ */
+ public Configuration getObserveConfig() {
+ if (observeConfig != null) {
+ return observeConfig;
+ }
+ return observeConfig = createObserveConfig();
+ }
+
+ /**
+ * Get Flink configuration object for deploying the given spec using {@link
+ *
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler#deploy}.
+ *
+ * @param spec Spec for which the config should be created.
+ * @return Deployment configuration.
Review Comment:
I think this is a generic consideration but writing it only here.
Worth to mention that it can be null. Saying this because sometimes it's
normal behavior to get null (for example session job is not ready). The plan B
is that we can expect null all the time from all the functions but that case
null handling will be an overkill.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java:
##########
@@ -139,7 +137,10 @@ private void observeTriggeredSavepoint(
/** Clean up and dispose savepoints according to the configured max
size/age. */
@VisibleForTesting
- void cleanupSavepointHistory(SavepointInfo currentSavepointInfo,
Configuration deployedConfig) {
+ void cleanupSavepointHistory(
+ FlinkService flinkService,
+ SavepointInfo currentSavepointInfo,
+ Configuration observeConfig) {
Review Comment:
We get `observeConfig` instead of `deployedConfig`. Does this mean behav
change or just naming?
I see this pattern other places so the question applies there as well.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##########
@@ -112,11 +85,11 @@ protected Optional<UpgradeMode> getAvailableUpgradeMode(
KubernetesOperatorConfigOptions
.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED)
&& FlinkUtils.isKubernetesHAActivated(deployConfig)
- && FlinkUtils.isKubernetesHAActivated(observeConfig)
+ && FlinkUtils.isKubernetesHAActivated(ctx.getObserveConfig())
&& !flinkVersionChanged(
ReconciliationUtils.getDeployedSpec(deployment),
deployment.getSpec())) {
- if (flinkService.isHaMetadataAvailable(deployConfig)) {
+ if (ctx.getFlinkService().isHaMetadataAvailable(deployConfig)) {
Review Comment:
As I see `ctx.getFlinkService()` multiple usage is a generic pattern.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
+import
org.apache.flink.kubernetes.operator.metrics.KubernetesResourceMetricGroup;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+/** Context for reconciling a Flink resource. * */
+@RequiredArgsConstructor
+public abstract class FlinkResourceContext<CR extends AbstractFlinkResource<?,
?>> {
+
+ @Getter private final CR resource;
+ @Getter private final Context<?> josdkContext;
+ @Getter private final KubernetesResourceMetricGroup resourceMetricGroup;
+
+ private Configuration observeConfig;
+
+ /**
+ * Get the config that is currently deployed for the resource spec.
+ *
+ * @return Config currently deployed.
+ */
+ public Configuration getObserveConfig() {
+ if (observeConfig != null) {
+ return observeConfig;
+ }
Review Comment:
Does this mean we never update the observe config?
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##########
@@ -159,23 +133,23 @@ private void deleteJmThatNeverStarted(FlinkDeployment
deployment, Configuration
}
@Override
- protected void deploy(
- FlinkDeployment relatedResource,
+ public void deploy(
+ FlinkResourceContext<FlinkDeployment> ctx,
FlinkDeploymentSpec spec,
- FlinkDeploymentStatus status,
- Context<?> ctx,
Configuration deployConfig,
Optional<String> savepoint,
boolean requireHaMetadata)
throws Exception {
+ var relatedResource = ctx.getResource();
+ var status = relatedResource.getStatus();
if (savepoint.isPresent()) {
deployConfig.set(SavepointConfigOptions.SAVEPOINT_PATH,
savepoint.get());
} else {
deployConfig.removeConfig(SavepointConfigOptions.SAVEPOINT_PATH);
}
- setOwnerReference(relatedResource, deployConfig);
+ setOwnerReference(ctx.getResource(), deployConfig);
Review Comment:
Here we get `relatedResource` and then using the `get...` function. Is there
a specific reason?
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthObserver.java:
##########
@@ -34,26 +33,30 @@ public class ClusterHealthObserver {
private static final Logger LOG =
LoggerFactory.getLogger(ClusterHealthObserver.class);
private static final String FULL_RESTARTS_METRIC_NAME = "fullRestarts";
private static final String NUM_RESTARTS_METRIC_NAME = "numRestarts";
- private final FlinkService flinkService;
private final ClusterHealthEvaluator clusterHealthEvaluator;
- public ClusterHealthObserver(FlinkService flinkService) {
- this.flinkService = flinkService;
+ public ClusterHealthObserver() {
this.clusterHealthEvaluator = new
ClusterHealthEvaluator(Clock.systemDefaultZone());
}
- /** Observe the health of the flink cluster. */
- public void observe(FlinkDeployment flinkApp, Configuration
deployedConfig) {
+ /**
+ * Observe the health of the flink cluster.
+ *
+ * @param ctx Resource context.
+ */
+ public void observe(FlinkResourceContext<FlinkDeployment> ctx) {
+ var flinkApp = ctx.getResource();
Review Comment:
This is not belongs to this PR but it could worth to mention. The return
value of `getResource()` is called at least 3-4 different way in the src.
`flinkApp, resource, flinkDep, deployment, ...` The aim is not to have a single
name but these are different enough that a not experienced reader can be
confused easily.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.service;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentContext;
+import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
+import org.apache.flink.kubernetes.operator.controller.FlinkSessionJobContext;
+import
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
+import
org.apache.flink.kubernetes.operator.metrics.KubernetesResourceMetricGroup;
+import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** Factory for creating the {@link FlinkResourceContext}. */
+public class FlinkResourceContextFactory {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkResourceContextFactory.class);
+
+ private final KubernetesClient kubernetesClient;
+ private final FlinkConfigManager configManager;
+ private final KubernetesOperatorMetricGroup operatorMetricGroup;
+ private final Map<KubernetesDeploymentMode, FlinkService> serviceMap;
+
+ private final Map<Tuple2<Class<?>, ResourceID>,
KubernetesResourceMetricGroup>
+ resourceMetricGroups = new ConcurrentHashMap<>();
+
+ public FlinkResourceContextFactory(
+ KubernetesClient kubernetesClient,
+ FlinkConfigManager configManager,
+ KubernetesOperatorMetricGroup operatorMetricGroup) {
+ this.kubernetesClient = kubernetesClient;
+ this.configManager = configManager;
+ this.operatorMetricGroup = operatorMetricGroup;
+ this.serviceMap = new ConcurrentHashMap<>();
+ }
+
+ public <CR extends AbstractFlinkResource<?, ?>> FlinkResourceContext<CR>
getResourceContext(
+ CR resource, Context josdkContext) {
+ var resMg =
+ resourceMetricGroups.computeIfAbsent(
+ Tuple2.of(resource.getClass(),
ResourceID.fromResource(resource)),
+ r ->
+ OperatorMetricUtils.createResourceMetricGroup(
+ operatorMetricGroup, configManager,
resource));
+
+ if (resource instanceof FlinkDeployment) {
+ var flinkDep = (FlinkDeployment) resource;
+ return (FlinkResourceContext<CR>)
+ new FlinkDeploymentContext(
+ flinkDep,
+ josdkContext,
+ resMg,
+ getOrCreateFlinkService(flinkDep),
+ configManager);
+ } else if (resource instanceof FlinkSessionJob) {
+ return (FlinkResourceContext<CR>)
+ new FlinkSessionJobContext(
+ (FlinkSessionJob) resource, josdkContext, resMg,
this, configManager);
+ } else {
+ throw new IllegalArgumentException(
+ "Unknown resource type " +
resource.getClass().getSimpleName());
+ }
Review Comment:
I think `instanceof` is always a smell of new interface. Not sure where it
can belong but pretty sure that we can do better here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]