Aitozi commented on code in PR #274: URL: https://github.com/apache/flink-kubernetes-operator/pull/274#discussion_r906991319
########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java: ########## @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.reconciler.deployment; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; +import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; +import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource; +import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; +import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec; +import org.apache.flink.kubernetes.operator.crd.spec.JobSpec; +import org.apache.flink.kubernetes.operator.crd.spec.JobState; +import org.apache.flink.kubernetes.operator.crd.status.CommonStatus; +import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus; +import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState; +import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus; +import org.apache.flink.kubernetes.operator.reconciler.Reconciler; +import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; +import org.apache.flink.kubernetes.operator.service.FlinkService; +import org.apache.flink.kubernetes.operator.utils.EventRecorder; +import org.apache.flink.kubernetes.operator.utils.FlinkUtils; + +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; + +/** + * Base class for all Flink resource reconcilers. It contains the general flow of reconciling Flink + * related resources including initial deployments, upgrades, rollbacks etc. + */ +public abstract class AbstractFlinkResourceReconciler< + CR extends AbstractFlinkResource<SPEC, STATUS>, + SPEC extends AbstractFlinkSpec, + STATUS extends CommonStatus<SPEC>> + implements Reconciler<CR> { + + private static final Logger LOG = + LoggerFactory.getLogger(AbstractFlinkResourceReconciler.class); + + protected final FlinkConfigManager configManager; + protected final EventRecorder eventRecorder; + protected final KubernetesClient kubernetesClient; + protected final FlinkService flinkService; + + public AbstractFlinkResourceReconciler( + KubernetesClient kubernetesClient, + FlinkService flinkService, + FlinkConfigManager configManager, + EventRecorder eventRecorder) { + this.kubernetesClient = kubernetesClient; + this.flinkService = flinkService; + this.configManager = configManager; + this.eventRecorder = eventRecorder; + } + + @Override + public final void reconcile(CR cr, Context ctx) throws Exception { + var spec = cr.getSpec(); + var deployConfig = getDeployConfig(cr.getMetadata(), spec, ctx); + var status = cr.getStatus(); + + // If the resource is not ready for reconciliation we simply return + if (!readyToReconcile(cr, ctx, deployConfig)) { + LOG.info("Not ready for reconciliation yet..."); + return; + } + + var firstDeployment = status.getReconciliationStatus().getLastReconciledSpec() == null; + + // If this is the first deployment for the resource we simply submit the job and return. + // No further logic is required at this point. + if (firstDeployment) { + LOG.info("Deploying for the first time"); + deploy( + cr.getMetadata(), + spec, + status, + deployConfig, + Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath), + false); + ReconciliationUtils.updateForSpecReconciliationSuccess( + cr, JobState.RUNNING, deployConfig); + return; + } + + var reconciliationStatus = cr.getStatus().getReconciliationStatus(); + SPEC lastReconciledSpec = + cr.getStatus().getReconciliationStatus().deserializeLastReconciledSpec(); + SPEC currentDeploySpec = cr.getSpec(); + + boolean specChanged = !currentDeploySpec.equals(lastReconciledSpec); + var observeConfig = getObserveConfig(cr, ctx); + + if (specChanged) { + if (checkNewSpecAlreadyDeployed(cr, deployConfig)) { + return; + } + LOG.info("Reconciling spec change"); + reconcileSpecChange(cr, observeConfig, deployConfig); + } else if (shouldRollBack(reconciliationStatus, observeConfig)) { + // Rollbacks are executed in two steps, we initiate it first then return + if (initiateRollBack(status)) { + return; + } + LOG.warn("Executing rollback operation"); + rollback(cr, ctx, observeConfig); + } else if (!reconcileOtherChanges(cr, observeConfig)) { + LOG.info("Resource fully reconciled, nothing to do..."); + } + } + + /** + * Get Flink configuration object for deploying the given spec using {@link #deploy}. + * + * @param meta ObjectMeta of the related resource. + * @param spec Spec for which the config should be created. + * @param ctx Reconciliation context. + * @return Deployment configuration. + */ + protected abstract Configuration getDeployConfig(ObjectMeta meta, SPEC spec, Context ctx); + + /** + * Get Flink configuration for client interactions with the running Flink deployment/session + * job. + * + * @param resource Related Flink resource. + * @param context Reconciliation context. + * @return Observe configuration. + */ + protected abstract Configuration getObserveConfig(CR resource, Context context); + + /** + * Check whether the given Flink resource is ready to be reconciled or we are still waiting for + * any pending operation or condition first. + * + * @param cr Related Flink resource. + * @param ctx Reconciliation context. + * @param deployConfig Deployment configuration. + * @return True if the resource is ready to be reconciled. + */ + protected abstract boolean readyToReconcile(CR cr, Context ctx, Configuration deployConfig); + + /** + * Reconcile spec upgrade on the currently deployed/suspended Flink resource and update the + * status accordingly. + * + * @param cr Related Flink resource. + * @param observeConfig Observe configuration. + * @param deployConfig Deployment configuration. + * @throws Exception + */ + protected abstract void reconcileSpecChange( + CR cr, Configuration observeConfig, Configuration deployConfig) throws Exception; + + /** + * Rollback deployed resource to the last stable spec. + * + * @param cr Related Flink resource. + * @param ctx Reconciliation context. + * @param observeConfig Observe configuration. + * @throws Exception + */ + protected abstract void rollback(CR cr, Context ctx, Configuration observeConfig) + throws Exception; + + /** + * Reconcile any other changes required for this resource that are specific to the reconciler + * implementation. + * + * @param cr Related Flink resource. + * @param observeConfig Observe configuration. + * @return True if any further reconciliation action was taken. + * @throws Exception + */ + protected abstract boolean reconcileOtherChanges(CR cr, Configuration observeConfig) + throws Exception; + + @Override + public final DeleteControl cleanup(CR resource, Context context) { + return cleanupInternal(resource, context); + } + + /** + * Deploys the target resource spec to Kubernetes. + * + * @param meta ObjectMeta of the related resource. + * @param spec Spec that should be deployed to Kubernetes. + * @param status Status object of the resource + * @param deployConfig Flink conf for the deployment. + * @param savepoint Optional savepoint path for applications and session jobs. + * @param requireHaMetadata Flag used by application deployments to validate HA metadata + * @throws Exception + */ + protected abstract void deploy( + ObjectMeta meta, + SPEC spec, + STATUS status, + Configuration deployConfig, + Optional<String> savepoint, + boolean requireHaMetadata) + throws Exception; + + /** + * Shut down and clean up all Flink job/cluster resources. + * + * @param resource Resource being reconciled. + * @param context Current context. + * @return DeleteControl object. + */ + protected abstract DeleteControl cleanupInternal(CR resource, Context context); + + /** + * Checks whether the desired spec already matches the currently deployed spec. If they match + * the resource status is updated to reflect successful reconciliation. + * + * @param resource Resource being reconciled. + * @param deployConf Deploy configuration for the Flink resource. + * @return True if desired spec was already deployed. + */ + private boolean checkNewSpecAlreadyDeployed(CR resource, Configuration deployConf) { + AbstractFlinkSpec deployedSpec = ReconciliationUtils.getDeployedSpec(resource); + if (resource.getSpec().equals(deployedSpec)) { + LOG.info( + "The new spec matches the currently deployed last stable spec. No upgrade needed."); + ReconciliationUtils.updateForSpecReconciliationSuccess( + resource, + deployedSpec.getJob() != null ? deployedSpec.getJob().getState() : null, + deployConf); + return true; + } + return false; + } + + /** + * Checks whether the currently deployed Flink resource spec should be rolled back to the stable + * spec. This includes validating the current deployment status, config and checking if the last + * reconciled spec did not become stable within the configured grace period. + * + * <p>Rollbacks are only supported to previously running resource specs with HA enabled. + * + * @param reconciliationStatus ReconciliationStatus of the resource. + * @param configuration Flink cluster configuration. + * @return + */ + private boolean shouldRollBack( Review Comment: IIUC, We also enable the rollback feature here for the session job right? But we still do not have a mark stable logic for it now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
