Aitozi commented on code in PR #274:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/274#discussion_r906991319


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
+import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
+
+/**
+ * Base class for all Flink resource reconcilers. It contains the general flow 
of reconciling Flink
+ * related resources including initial deployments, upgrades, rollbacks etc.
+ */
+public abstract class AbstractFlinkResourceReconciler<
+                CR extends AbstractFlinkResource<SPEC, STATUS>,
+                SPEC extends AbstractFlinkSpec,
+                STATUS extends CommonStatus<SPEC>>
+        implements Reconciler<CR> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AbstractFlinkResourceReconciler.class);
+
+    protected final FlinkConfigManager configManager;
+    protected final EventRecorder eventRecorder;
+    protected final KubernetesClient kubernetesClient;
+    protected final FlinkService flinkService;
+
+    public AbstractFlinkResourceReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkConfigManager configManager,
+            EventRecorder eventRecorder) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.configManager = configManager;
+        this.eventRecorder = eventRecorder;
+    }
+
+    @Override
+    public final void reconcile(CR cr, Context ctx) throws Exception {
+        var spec = cr.getSpec();
+        var deployConfig = getDeployConfig(cr.getMetadata(), spec, ctx);
+        var status = cr.getStatus();
+
+        // If the resource is not ready for reconciliation we simply return
+        if (!readyToReconcile(cr, ctx, deployConfig)) {
+            LOG.info("Not ready for reconciliation yet...");
+            return;
+        }
+
+        var firstDeployment = 
status.getReconciliationStatus().getLastReconciledSpec() == null;
+
+        // If this is the first deployment for the resource we simply submit 
the job and return.
+        // No further logic is required at this point.
+        if (firstDeployment) {
+            LOG.info("Deploying for the first time");
+            deploy(
+                    cr.getMetadata(),
+                    spec,
+                    status,
+                    deployConfig,
+                    
Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath),
+                    false);
+            ReconciliationUtils.updateForSpecReconciliationSuccess(
+                    cr, JobState.RUNNING, deployConfig);
+            return;
+        }
+
+        var reconciliationStatus = cr.getStatus().getReconciliationStatus();
+        SPEC lastReconciledSpec =
+                
cr.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+        SPEC currentDeploySpec = cr.getSpec();
+
+        boolean specChanged = !currentDeploySpec.equals(lastReconciledSpec);
+        var observeConfig = getObserveConfig(cr, ctx);
+
+        if (specChanged) {
+            if (checkNewSpecAlreadyDeployed(cr, deployConfig)) {
+                return;
+            }
+            LOG.info("Reconciling spec change");
+            reconcileSpecChange(cr, observeConfig, deployConfig);
+        } else if (shouldRollBack(reconciliationStatus, observeConfig)) {
+            // Rollbacks are executed in two steps, we initiate it first then 
return
+            if (initiateRollBack(status)) {
+                return;
+            }
+            LOG.warn("Executing rollback operation");
+            rollback(cr, ctx, observeConfig);
+        } else if (!reconcileOtherChanges(cr, observeConfig)) {
+            LOG.info("Resource fully reconciled, nothing to do...");
+        }
+    }
+
+    /**
+     * Get Flink configuration object for deploying the given spec using 
{@link #deploy}.
+     *
+     * @param meta ObjectMeta of the related resource.
+     * @param spec Spec for which the config should be created.
+     * @param ctx Reconciliation context.
+     * @return Deployment configuration.
+     */
+    protected abstract Configuration getDeployConfig(ObjectMeta meta, SPEC 
spec, Context ctx);
+
+    /**
+     * Get Flink configuration for client interactions with the running Flink 
deployment/session
+     * job.
+     *
+     * @param resource Related Flink resource.
+     * @param context Reconciliation context.
+     * @return Observe configuration.
+     */
+    protected abstract Configuration getObserveConfig(CR resource, Context 
context);
+
+    /**
+     * Check whether the given Flink resource is ready to be reconciled or we 
are still waiting for
+     * any pending operation or condition first.
+     *
+     * @param cr Related Flink resource.
+     * @param ctx Reconciliation context.
+     * @param deployConfig Deployment configuration.
+     * @return True if the resource is ready to be reconciled.
+     */
+    protected abstract boolean readyToReconcile(CR cr, Context ctx, 
Configuration deployConfig);
+
+    /**
+     * Reconcile spec upgrade on the currently deployed/suspended Flink 
resource and update the
+     * status accordingly.
+     *
+     * @param cr Related Flink resource.
+     * @param observeConfig Observe configuration.
+     * @param deployConfig Deployment configuration.
+     * @throws Exception
+     */
+    protected abstract void reconcileSpecChange(
+            CR cr, Configuration observeConfig, Configuration deployConfig) 
throws Exception;
+
+    /**
+     * Rollback deployed resource to the last stable spec.
+     *
+     * @param cr Related Flink resource.
+     * @param ctx Reconciliation context.
+     * @param observeConfig Observe configuration.
+     * @throws Exception
+     */
+    protected abstract void rollback(CR cr, Context ctx, Configuration 
observeConfig)
+            throws Exception;
+
+    /**
+     * Reconcile any other changes required for this resource that are 
specific to the reconciler
+     * implementation.
+     *
+     * @param cr Related Flink resource.
+     * @param observeConfig Observe configuration.
+     * @return True if any further reconciliation action was taken.
+     * @throws Exception
+     */
+    protected abstract boolean reconcileOtherChanges(CR cr, Configuration 
observeConfig)
+            throws Exception;
+
+    @Override
+    public final DeleteControl cleanup(CR resource, Context context) {
+        return cleanupInternal(resource, context);
+    }
+
+    /**
+     * Deploys the target resource spec to Kubernetes.
+     *
+     * @param meta ObjectMeta of the related resource.
+     * @param spec Spec that should be deployed to Kubernetes.
+     * @param status Status object of the resource
+     * @param deployConfig Flink conf for the deployment.
+     * @param savepoint Optional savepoint path for applications and session 
jobs.
+     * @param requireHaMetadata Flag used by application deployments to 
validate HA metadata
+     * @throws Exception
+     */
+    protected abstract void deploy(
+            ObjectMeta meta,
+            SPEC spec,
+            STATUS status,
+            Configuration deployConfig,
+            Optional<String> savepoint,
+            boolean requireHaMetadata)
+            throws Exception;
+
+    /**
+     * Shut down and clean up all Flink job/cluster resources.
+     *
+     * @param resource Resource being reconciled.
+     * @param context Current context.
+     * @return DeleteControl object.
+     */
+    protected abstract DeleteControl cleanupInternal(CR resource, Context 
context);
+
+    /**
+     * Checks whether the desired spec already matches the currently deployed 
spec. If they match
+     * the resource status is updated to reflect successful reconciliation.
+     *
+     * @param resource Resource being reconciled.
+     * @param deployConf Deploy configuration for the Flink resource.
+     * @return True if desired spec was already deployed.
+     */
+    private boolean checkNewSpecAlreadyDeployed(CR resource, Configuration 
deployConf) {
+        AbstractFlinkSpec deployedSpec = 
ReconciliationUtils.getDeployedSpec(resource);
+        if (resource.getSpec().equals(deployedSpec)) {
+            LOG.info(
+                    "The new spec matches the currently deployed last stable 
spec. No upgrade needed.");
+            ReconciliationUtils.updateForSpecReconciliationSuccess(
+                    resource,
+                    deployedSpec.getJob() != null ? 
deployedSpec.getJob().getState() : null,
+                    deployConf);
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Checks whether the currently deployed Flink resource spec should be 
rolled back to the stable
+     * spec. This includes validating the current deployment status, config 
and checking if the last
+     * reconciled spec did not become stable within the configured grace 
period.
+     *
+     * <p>Rollbacks are only supported to previously running resource specs 
with HA enabled.
+     *
+     * @param reconciliationStatus ReconciliationStatus of the resource.
+     * @param configuration Flink cluster configuration.
+     * @return
+     */
+    private boolean shouldRollBack(

Review Comment:
   IIUC, We also enable the rollback feature here for the session job right? 
But we still do not have a mark stable logic for it now. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to