This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 3a49555e49df780c4efa8ecd4b679c2895766e29 Author: Gyula Fora <[email protected]> AuthorDate: Tue Jun 21 14:43:58 2022 +0200 [FLINK-28180] Unify Application and SessionJob reconciler logic --- .../flink/kubernetes/operator/FlinkOperator.java | 5 +- .../operator/artifact/ArtifactManager.java | 12 +- .../operator/config/FlinkConfigManager.java | 8 +- .../observer/sessionjob/SessionJobObserver.java | 9 +- .../operator/reconciler/ReconciliationUtils.java | 76 +---- .../deployment/AbstractDeploymentReconciler.java | 95 ------ .../AbstractFlinkResourceReconciler.java | 355 +++++++++++++++++++++ .../deployment/AbstractJobReconciler.java | 211 ++++++++++++ .../deployment/ApplicationReconciler.java | 253 ++++----------- .../reconciler/deployment/SessionReconciler.java | 122 +++---- .../sessionjob/FlinkSessionJobReconciler.java | 211 ------------ .../reconciler/sessionjob/SessionJobHelper.java | 61 ---- .../sessionjob/SessionJobReconciler.java | 147 +++++++++ .../kubernetes/operator/service/FlinkService.java | 57 ++-- .../operator/validation/DefaultValidator.java | 2 +- .../kubernetes/operator/TestingFlinkService.java | 47 +-- .../operator/artifact/ArtifactManagerTest.java | 4 +- .../sessionjob/SessionJobObserverTest.java | 8 +- .../deployment/ApplicationReconcilerTest.java | 8 +- ...ilerTest.java => SessionJobReconcilerTest.java} | 145 ++++----- .../operator/service/FlinkServiceTest.java | 10 +- 21 files changed, 1001 insertions(+), 845 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java index 5e93866..1b0daea 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java @@ -33,7 +33,7 @@ import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils; import org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory; import org.apache.flink.kubernetes.operator.observer.sessionjob.SessionJobObserver; import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory; -import org.apache.flink.kubernetes.operator.reconciler.sessionjob.FlinkSessionJobReconciler; +import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler; import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.kubernetes.operator.utils.EnvUtils; import org.apache.flink.kubernetes.operator.utils.EventRecorder; @@ -132,8 +132,9 @@ public class FlinkOperator { } private void registerSessionJobController() { - var reconciler = new FlinkSessionJobReconciler(client, flinkService, configManager); var eventRecorder = EventRecorder.create(client, listeners); + var reconciler = + new SessionJobReconciler(client, flinkService, configManager, eventRecorder); var statusRecorder = StatusRecorder.<FlinkSessionJobStatus>create(client, listeners); var observer = new SessionJobObserver(flinkService, configManager, statusRecorder, eventRecorder); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/ArtifactManager.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/ArtifactManager.java index fa9cf98..ec370de 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/ArtifactManager.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/ArtifactManager.java @@ -19,9 +19,10 @@ package org.apache.flink.kubernetes.operator.artifact; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; -import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob; +import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec; import org.apache.flink.util.FlinkRuntimeException; +import io.fabric8.kubernetes.api.model.ObjectMeta; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,15 +65,14 @@ public class ArtifactManager { } } - public String generateJarDir(FlinkSessionJob sessionJob) { + public String generateJarDir(ObjectMeta meta, FlinkSessionJobSpec spec) { return String.join( File.separator, new String[] { new File(configManager.getOperatorConfiguration().getArtifactsBaseDir()) - .getAbsolutePath(), - sessionJob.getMetadata().getNamespace(), - sessionJob.getSpec().getDeploymentName(), - sessionJob.getMetadata().getName() + .getAbsolutePath(), + meta.getNamespace(), + spec.getDeploymentName(), meta.getName() }); } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java index 0895f29..e7fbbfc 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java @@ -22,8 +22,8 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; -import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob; import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec; +import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; @@ -43,7 +43,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; -import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.Executors; @@ -161,12 +160,11 @@ public class FlinkConfigManager { } public Configuration getSessionJobConfig( - FlinkDeployment deployment, FlinkSessionJob flinkSessionJob) { + FlinkDeployment deployment, FlinkSessionJobSpec sessionJobSpec) { Configuration sessionJobConfig = getObserveConfig(deployment); // merge session job specific config - Map<String, String> sessionJobFlinkConfiguration = - flinkSessionJob.getSpec().getFlinkConfiguration(); + var sessionJobFlinkConfiguration = sessionJobSpec.getFlinkConfiguration(); if (sessionJobFlinkConfiguration != null) { sessionJobFlinkConfiguration.forEach(sessionJobConfig::setString); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java index 6169a4d..78922f1 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java @@ -26,7 +26,7 @@ import org.apache.flink.kubernetes.operator.observer.JobStatusObserver; import org.apache.flink.kubernetes.operator.observer.Observer; import org.apache.flink.kubernetes.operator.observer.SavepointObserver; import org.apache.flink.kubernetes.operator.observer.context.VoidObserverContext; -import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobHelper; +import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler; import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.kubernetes.operator.utils.SavepointUtils; @@ -103,13 +103,12 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> { Optional<FlinkDeployment> flinkDepOpt = context.getSecondaryResource(FlinkDeployment.class); - var helper = new SessionJobHelper(flinkSessionJob, LOG); - - if (!helper.sessionClusterReady(flinkDepOpt)) { + if (!SessionJobReconciler.sessionClusterReady(flinkDepOpt)) { return; } - var deployedConfig = configManager.getSessionJobConfig(flinkDepOpt.get(), flinkSessionJob); + var deployedConfig = + configManager.getSessionJobConfig(flinkDepOpt.get(), flinkSessionJob.getSpec()); var jobFound = jobStatusObserver.observe( flinkSessionJob, deployedConfig, VoidObserverContext.INSTANCE); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java index 63d0f15..ddbc79f 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java @@ -19,18 +19,14 @@ package org.apache.flink.kubernetes.operator.reconciler; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; -import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec; -import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec; import org.apache.flink.kubernetes.operator.crd.spec.JobState; import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.crd.status.CommonStatus; import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; -import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus; import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState; import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus; import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo; @@ -38,7 +34,6 @@ import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType; import org.apache.flink.kubernetes.operator.crd.status.TaskManagerInfo; import org.apache.flink.kubernetes.operator.exception.ReconciliationException; import org.apache.flink.kubernetes.operator.metrics.MetricManager; -import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; import org.apache.flink.kubernetes.operator.utils.StatusRecorder; import org.apache.flink.util.Preconditions; @@ -55,8 +50,6 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; -import java.time.Duration; -import java.time.Instant; import java.util.Optional; /** Reconciliation utilities. */ @@ -197,18 +190,19 @@ public class ReconciliationUtils { } public static boolean isUpgradeModeChangedToLastStateAndHADisabledPreviously( - FlinkDeployment flinkApp, FlinkConfigManager configManager) { + AbstractFlinkResource<?, ?> flinkApp, Configuration observeConfig) { - FlinkDeploymentSpec deployedSpec = getDeployedSpec(flinkApp); + var deployedSpec = getDeployedSpec(flinkApp); UpgradeMode previousUpgradeMode = deployedSpec.getJob().getUpgradeMode(); UpgradeMode currentUpgradeMode = flinkApp.getSpec().getJob().getUpgradeMode(); return previousUpgradeMode != UpgradeMode.LAST_STATE && currentUpgradeMode == UpgradeMode.LAST_STATE - && !FlinkUtils.isKubernetesHAActivated(configManager.getObserveConfig(flinkApp)); + && !FlinkUtils.isKubernetesHAActivated(observeConfig); } - public static FlinkDeploymentSpec getDeployedSpec(FlinkDeployment deployment) { + public static <SPEC extends AbstractFlinkSpec> SPEC getDeployedSpec( + AbstractFlinkResource<SPEC, ?> deployment) { var reconciliationStatus = deployment.getStatus().getReconciliationStatus(); var reconciliationState = reconciliationStatus.getState(); if (reconciliationState != ReconciliationState.ROLLED_BACK) { @@ -267,66 +261,6 @@ public class ReconciliationUtils { } } - public static boolean shouldRollBack( - FlinkService flinkService, - ReconciliationStatus<FlinkDeploymentSpec> reconciliationStatus, - Configuration configuration) { - - if (reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) { - return true; - } - - if (!configuration.get(KubernetesOperatorConfigOptions.DEPLOYMENT_ROLLBACK_ENABLED) - || reconciliationStatus.getState() == ReconciliationState.ROLLED_BACK - || reconciliationStatus.isLastReconciledSpecStable()) { - return false; - } - - var lastStableSpec = reconciliationStatus.deserializeLastStableSpec(); - if (lastStableSpec != null - && lastStableSpec.getJob() != null - && lastStableSpec.getJob().getState() == JobState.SUSPENDED) { - // Should not roll back to suspended state - return false; - } - - Duration readinessTimeout = - configuration.get(KubernetesOperatorConfigOptions.DEPLOYMENT_READINESS_TIMEOUT); - if (!Instant.now() - .minus(readinessTimeout) - .isAfter(Instant.ofEpochMilli(reconciliationStatus.getReconciliationTimestamp()))) { - return false; - } - - var haDataAvailable = flinkService.isHaMetadataAvailable(configuration); - if (!haDataAvailable) { - LOG.warn("Rollback is not possible due to missing HA metadata"); - } - return haDataAvailable; - } - - public static boolean shouldRecoverDeployment(Configuration conf, FlinkDeployment deployment) { - - if (!ReconciliationUtils.jmMissingForRunningDeployment(deployment) - || !conf.get( - KubernetesOperatorConfigOptions.OPERATOR_JM_DEPLOYMENT_RECOVERY_ENABLED)) { - return false; - } - - if (!FlinkUtils.isKubernetesHAActivated(conf)) { - LOG.warn("Could not recover lost deployment without HA enabled"); - return false; - } - return true; - } - - private static boolean jmMissingForRunningDeployment(FlinkDeployment deployment) { - var deployedJob = getDeployedSpec(deployment).getJob(); - return (deployedJob == null || deployedJob.getState() == JobState.RUNNING) - && (deployment.getStatus().getJobManagerDeploymentStatus() - == JobManagerDeploymentStatus.MISSING); - } - public static boolean isJobInTerminalState(CommonStatus<?> status) { var jobState = status.getJobStatus().getState(); return org.apache.flink.api.common.JobStatus.valueOf(jobState).isGloballyTerminalState(); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractDeploymentReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractDeploymentReconciler.java deleted file mode 100644 index 16d7b44..0000000 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractDeploymentReconciler.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.kubernetes.operator.reconciler.deployment; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; -import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; -import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec; -import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; -import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState; -import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus; -import org.apache.flink.kubernetes.operator.reconciler.Reconciler; -import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; -import org.apache.flink.kubernetes.operator.service.FlinkService; -import org.apache.flink.kubernetes.operator.utils.EventRecorder; - -import io.fabric8.kubernetes.client.KubernetesClient; -import io.javaoperatorsdk.operator.api.reconciler.Context; -import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** BaseReconciler with functionality that is common to job and session modes. */ -public abstract class AbstractDeploymentReconciler implements Reconciler<FlinkDeployment> { - - private static final Logger LOG = LoggerFactory.getLogger(AbstractDeploymentReconciler.class); - - protected final FlinkConfigManager configManager; - protected final EventRecorder eventRecorder; - protected final KubernetesClient kubernetesClient; - protected final FlinkService flinkService; - - public AbstractDeploymentReconciler( - KubernetesClient kubernetesClient, - FlinkService flinkService, - FlinkConfigManager configManager, - EventRecorder eventRecorder) { - this.kubernetesClient = kubernetesClient; - this.flinkService = flinkService; - this.configManager = configManager; - this.eventRecorder = eventRecorder; - } - - @Override - public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) { - shutdown(flinkApp); - return DeleteControl.defaultDelete(); - } - - protected boolean initiateRollBack(FlinkDeploymentStatus status) { - ReconciliationStatus<?> reconciliationStatus = status.getReconciliationStatus(); - if (reconciliationStatus.getState() != ReconciliationState.ROLLING_BACK) { - LOG.warn("Preparing to roll back to last stable spec."); - if (StringUtils.isEmpty(status.getError())) { - status.setError( - "Deployment is not ready within the configured timeout, rolling back."); - } - reconciliationStatus.setState(ReconciliationState.ROLLING_BACK); - return true; - } - return false; - } - - protected boolean newSpecIsAlreadyDeployed(FlinkDeployment flinkApp, Configuration deployConf) { - FlinkDeploymentSpec deployedSpec = ReconciliationUtils.getDeployedSpec(flinkApp); - if (flinkApp.getSpec().equals(deployedSpec)) { - LOG.info( - "The new spec matches the currently deployed last stable spec. No upgrade needed."); - ReconciliationUtils.updateForSpecReconciliationSuccess( - flinkApp, - deployedSpec.getJob() != null ? deployedSpec.getJob().getState() : null, - deployConf); - return true; - } - return false; - } - - protected abstract void shutdown(FlinkDeployment flinkApp); -} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java new file mode 100644 index 0000000..2e0d592 --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.reconciler.deployment; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; +import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; +import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource; +import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; +import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec; +import org.apache.flink.kubernetes.operator.crd.spec.JobSpec; +import org.apache.flink.kubernetes.operator.crd.spec.JobState; +import org.apache.flink.kubernetes.operator.crd.status.CommonStatus; +import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus; +import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState; +import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus; +import org.apache.flink.kubernetes.operator.reconciler.Reconciler; +import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; +import org.apache.flink.kubernetes.operator.service.FlinkService; +import org.apache.flink.kubernetes.operator.utils.EventRecorder; +import org.apache.flink.kubernetes.operator.utils.FlinkUtils; + +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; + +/** + * Base class for all Flink resource reconcilers. It contains the general flow of reconciling Flink + * related resources including initial deployments, upgrades, rollbacks etc. + */ +public abstract class AbstractFlinkResourceReconciler< + CR extends AbstractFlinkResource<SPEC, STATUS>, + SPEC extends AbstractFlinkSpec, + STATUS extends CommonStatus<SPEC>> + implements Reconciler<CR> { + + private static final Logger LOG = + LoggerFactory.getLogger(AbstractFlinkResourceReconciler.class); + + protected final FlinkConfigManager configManager; + protected final EventRecorder eventRecorder; + protected final KubernetesClient kubernetesClient; + protected final FlinkService flinkService; + + public AbstractFlinkResourceReconciler( + KubernetesClient kubernetesClient, + FlinkService flinkService, + FlinkConfigManager configManager, + EventRecorder eventRecorder) { + this.kubernetesClient = kubernetesClient; + this.flinkService = flinkService; + this.configManager = configManager; + this.eventRecorder = eventRecorder; + } + + @Override + public final void reconcile(CR cr, Context ctx) throws Exception { + var spec = cr.getSpec(); + var deployConfig = getDeployConfig(cr.getMetadata(), spec, ctx); + var status = cr.getStatus(); + + // If the resource is not ready for reconciliation we simply return + if (!readyToReconcile(cr, ctx, deployConfig)) { + LOG.info("Not ready for reconciliation yet..."); + return; + } + + var firstDeployment = status.getReconciliationStatus().getLastReconciledSpec() == null; + + // If this is the first deployment for the resource we simply submit the job and return. + // No further logic is required at this point. + if (firstDeployment) { + LOG.info("Deploying for the first time"); + deploy( + cr.getMetadata(), + spec, + status, + deployConfig, + Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath), + false); + ReconciliationUtils.updateForSpecReconciliationSuccess( + cr, JobState.RUNNING, deployConfig); + return; + } + + var reconciliationStatus = cr.getStatus().getReconciliationStatus(); + SPEC lastReconciledSpec = + cr.getStatus().getReconciliationStatus().deserializeLastReconciledSpec(); + SPEC currentDeploySpec = cr.getSpec(); + + boolean specChanged = !currentDeploySpec.equals(lastReconciledSpec); + var observeConfig = getObserveConfig(cr, ctx); + + if (specChanged) { + if (checkNewSpecAlreadyDeployed(cr, deployConfig)) { + return; + } + LOG.info("Reconciling spec change"); + reconcileSpecChange(cr, observeConfig, deployConfig); + } else if (shouldRollBack(reconciliationStatus, observeConfig)) { + // Rollbacks are executed in two steps, we initiate it first then return + if (initiateRollBack(status)) { + return; + } + LOG.warn("Executing rollback operation"); + rollback(cr, ctx, observeConfig); + } else if (!reconcileOtherChanges(cr, observeConfig)) { + LOG.info("Resource fully reconciled, nothing to do..."); + } + } + + /** + * Get Flink configuration object for deploying the given spec using {@link #deploy}. + * + * @param meta ObjectMeta of the related resource. + * @param spec Spec for which the config should be created. + * @param ctx Reconciliation context. + * @return Deployment configuration. + */ + protected abstract Configuration getDeployConfig(ObjectMeta meta, SPEC spec, Context ctx); + + /** + * Get Flink configuration for client interactions with the running Flink deployment/session + * job. + * + * @param resource Related Flink resource. + * @param context Reconciliation context. + * @return Observe configuration. + */ + protected abstract Configuration getObserveConfig(CR resource, Context context); + + /** + * Check whether the given Flink resource is ready to be reconciled or we are still waiting for + * any pending operation or condition first. + * + * @param cr Related Flink resource. + * @param ctx Reconciliation context. + * @param deployConfig Deployment configuration. + * @return True if the resource is ready to be reconciled. + */ + protected abstract boolean readyToReconcile(CR cr, Context ctx, Configuration deployConfig); + + /** + * Reconcile spec upgrade on the currently deployed/suspended Flink resource and update the + * status accordingly. + * + * @param cr Related Flink resource. + * @param observeConfig Observe configuration. + * @param deployConfig Deployment configuration. + * @throws Exception + */ + protected abstract void reconcileSpecChange( + CR cr, Configuration observeConfig, Configuration deployConfig) throws Exception; + + /** + * Rollback deployed resource to the last stable spec. + * + * @param cr Related Flink resource. + * @param ctx Reconciliation context. + * @param observeConfig Observe configuration. + * @throws Exception + */ + protected abstract void rollback(CR cr, Context ctx, Configuration observeConfig) + throws Exception; + + /** + * Reconcile any other changes required for this resource that are specific to the reconciler + * implementation. + * + * @param cr Related Flink resource. + * @param observeConfig Observe configuration. + * @return True if any further reconciliation action was taken. + * @throws Exception + */ + protected abstract boolean reconcileOtherChanges(CR cr, Configuration observeConfig) + throws Exception; + + @Override + public final DeleteControl cleanup(CR resource, Context context) { + return cleanupInternal(resource, context); + } + + /** + * Deploys the target resource spec to Kubernetes. + * + * @param meta ObjectMeta of the related resource. + * @param spec Spec that should be deployed to Kubernetes. + * @param status Status object of the resource + * @param deployConfig Flink conf for the deployment. + * @param savepoint Optional savepoint path for applications and session jobs. + * @param requireHaMetadata Flag used by application deployments to validate HA metadata + * @throws Exception + */ + protected abstract void deploy( + ObjectMeta meta, + SPEC spec, + STATUS status, + Configuration deployConfig, + Optional<String> savepoint, + boolean requireHaMetadata) + throws Exception; + + /** + * Shut down and clean up all Flink job/cluster resources. + * + * @param resource Resource being reconciled. + * @param context Current context. + * @return DeleteControl object. + */ + protected abstract DeleteControl cleanupInternal(CR resource, Context context); + + /** + * Checks whether the desired spec already matches the currently deployed spec. If they match + * the resource status is updated to reflect successful reconciliation. + * + * @param resource Resource being reconciled. + * @param deployConf Deploy configuration for the Flink resource. + * @return True if desired spec was already deployed. + */ + private boolean checkNewSpecAlreadyDeployed(CR resource, Configuration deployConf) { + AbstractFlinkSpec deployedSpec = ReconciliationUtils.getDeployedSpec(resource); + if (resource.getSpec().equals(deployedSpec)) { + LOG.info( + "The new spec matches the currently deployed last stable spec. No upgrade needed."); + ReconciliationUtils.updateForSpecReconciliationSuccess( + resource, + deployedSpec.getJob() != null ? deployedSpec.getJob().getState() : null, + deployConf); + return true; + } + return false; + } + + /** + * Checks whether the currently deployed Flink resource spec should be rolled back to the stable + * spec. This includes validating the current deployment status, config and checking if the last + * reconciled spec did not become stable within the configured grace period. + * + * <p>Rollbacks are only supported to previously running resource specs with HA enabled. + * + * @param reconciliationStatus ReconciliationStatus of the resource. + * @param configuration Flink cluster configuration. + * @return + */ + private boolean shouldRollBack( + ReconciliationStatus<SPEC> reconciliationStatus, Configuration configuration) { + + if (reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) { + return true; + } + + if (!configuration.get(KubernetesOperatorConfigOptions.DEPLOYMENT_ROLLBACK_ENABLED) + || reconciliationStatus.getState() == ReconciliationState.ROLLED_BACK + || reconciliationStatus.isLastReconciledSpecStable()) { + return false; + } + + var lastStableSpec = reconciliationStatus.deserializeLastStableSpec(); + if (lastStableSpec != null + && lastStableSpec.getJob() != null + && lastStableSpec.getJob().getState() == JobState.SUSPENDED) { + // Should not roll back to suspended state + return false; + } + + Duration readinessTimeout = + configuration.get(KubernetesOperatorConfigOptions.DEPLOYMENT_READINESS_TIMEOUT); + if (!Instant.now() + .minus(readinessTimeout) + .isAfter(Instant.ofEpochMilli(reconciliationStatus.getReconciliationTimestamp()))) { + return false; + } + + var haDataAvailable = flinkService.isHaMetadataAvailable(configuration); + if (!haDataAvailable) { + LOG.warn("Rollback is not possible due to missing HA metadata"); + } + return haDataAvailable; + } + + /** + * Initiate rollback process by changing the {@link ReconciliationState} in the status. + * + * @param status Resource status. + * @return True if a new rollback was initiated. + */ + private boolean initiateRollBack(STATUS status) { + var reconciliationStatus = status.getReconciliationStatus(); + if (reconciliationStatus.getState() != ReconciliationState.ROLLING_BACK) { + LOG.warn("Preparing to roll back to last stable spec."); + if (StringUtils.isEmpty(status.getError())) { + status.setError( + "Deployment is not ready within the configured timeout, rolling back."); + } + reconciliationStatus.setState(ReconciliationState.ROLLING_BACK); + return true; + } + return false; + } + + /** + * Checks whether the JobManager Kubernetes Deployment recovery logic should be initiated. This + * is triggered only if, jm deployment missing, recovery config and HA enabled. This logic is + * only used by the Session and Application reconcilers. + * + * @param conf Flink cluster configuration. + * @param deployment FlinkDeployment object. + * @return True if recovery should be executed. + */ + protected static boolean shouldRecoverDeployment( + Configuration conf, FlinkDeployment deployment) { + + if (!jmMissingForRunningDeployment(deployment) + || !conf.get( + KubernetesOperatorConfigOptions.OPERATOR_JM_DEPLOYMENT_RECOVERY_ENABLED)) { + return false; + } + + if (!FlinkUtils.isKubernetesHAActivated(conf)) { + LOG.warn("Could not recover lost deployment without HA enabled"); + return false; + } + return true; + } + + private static boolean jmMissingForRunningDeployment(FlinkDeployment deployment) { + var deployedJob = ReconciliationUtils.getDeployedSpec(deployment).getJob(); + return (deployedJob == null || deployedJob.getState() == JobState.RUNNING) + && (deployment.getStatus().getJobManagerDeploymentStatus() + == JobManagerDeploymentStatus.MISSING); + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java new file mode 100644 index 0000000..3ca8735 --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.reconciler.deployment; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; +import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; +import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource; +import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec; +import org.apache.flink.kubernetes.operator.crd.spec.JobState; +import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; +import org.apache.flink.kubernetes.operator.crd.status.CommonStatus; +import org.apache.flink.kubernetes.operator.crd.status.JobStatus; +import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState; +import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; +import org.apache.flink.kubernetes.operator.service.FlinkService; +import org.apache.flink.kubernetes.operator.utils.EventRecorder; +import org.apache.flink.kubernetes.operator.utils.SavepointUtils; + +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; + +/** + * Reconciler responsible for handling the job lifecycle according to the desired and current + * states. + */ +public abstract class AbstractJobReconciler< + CR extends AbstractFlinkResource<SPEC, STATUS>, + SPEC extends AbstractFlinkSpec, + STATUS extends CommonStatus<SPEC>> + extends AbstractFlinkResourceReconciler<CR, SPEC, STATUS> { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractJobReconciler.class); + + public AbstractJobReconciler( + KubernetesClient kubernetesClient, + FlinkService flinkService, + FlinkConfigManager configManager, + EventRecorder eventRecorder) { + super(kubernetesClient, flinkService, configManager, eventRecorder); + } + + @Override + public boolean readyToReconcile(CR resource, Context context, Configuration deployConfig) { + if (shouldWaitForPendingSavepoint( + resource.getStatus().getJobStatus(), + getDeployConfig(resource.getMetadata(), resource.getSpec(), context))) { + LOG.info("Delaying job reconciliation until pending savepoint is completed."); + return false; + } + return true; + } + + private boolean shouldWaitForPendingSavepoint(JobStatus jobStatus, Configuration conf) { + return !conf.getBoolean( + KubernetesOperatorConfigOptions.JOB_UPGRADE_IGNORE_PENDING_SAVEPOINT) + && SavepointUtils.savepointInProgress(jobStatus); + } + + @Override + protected void reconcileSpecChange( + CR resource, Configuration observeConfig, Configuration deployConfig) throws Exception { + var deployMeta = resource.getMetadata(); + STATUS status = resource.getStatus(); + var reconciliationStatus = status.getReconciliationStatus(); + SPEC lastReconciledSpec = reconciliationStatus.deserializeLastReconciledSpec(); + SPEC currentDeploySpec = resource.getSpec(); + + JobState currentJobState = lastReconciledSpec.getJob().getState(); + JobState desiredJobState = currentDeploySpec.getJob().getState(); + JobState newState = currentJobState; + if (currentJobState == JobState.RUNNING) { + if (desiredJobState == JobState.RUNNING) { + LOG.info("Upgrading/Restarting running job, suspending first..."); + } + Optional<UpgradeMode> availableUpgradeMode = + getAvailableUpgradeMode(resource, deployConfig, observeConfig); + if (availableUpgradeMode.isEmpty()) { + return; + } + // We must record the upgrade mode used to the status later + currentDeploySpec.getJob().setUpgradeMode(availableUpgradeMode.get()); + cancelJob(resource, availableUpgradeMode.get(), observeConfig); + newState = JobState.SUSPENDED; + } + if (currentJobState == JobState.SUSPENDED && desiredJobState == JobState.RUNNING) { + restoreJob( + deployMeta, + currentDeploySpec, + status, + deployConfig, + // We decide to enforce HA based on how job was previously suspended + lastReconciledSpec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE); + newState = JobState.RUNNING; + } + ReconciliationUtils.updateForSpecReconciliationSuccess(resource, newState, deployConfig); + } + + protected Optional<UpgradeMode> getAvailableUpgradeMode( + CR resource, Configuration deployConfig, Configuration observeConfig) { + var status = resource.getStatus(); + var upgradeMode = resource.getSpec().getJob().getUpgradeMode(); + var changedToLastStateWithoutHa = + ReconciliationUtils.isUpgradeModeChangedToLastStateAndHADisabledPreviously( + resource, observeConfig); + + if (upgradeMode == UpgradeMode.STATELESS) { + LOG.info("Stateless job, ready for upgrade"); + return Optional.of(upgradeMode); + } + + if (ReconciliationUtils.isJobInTerminalState(status)) { + LOG.info( + "Job is in terminal state, ready for upgrade from observed latest checkpoint/savepoint"); + return Optional.of(UpgradeMode.SAVEPOINT); + } + + if (ReconciliationUtils.isJobRunning(status)) { + LOG.info("Job is in running state, ready for upgrade with {}", upgradeMode); + if (changedToLastStateWithoutHa) { + LOG.info( + "Using savepoint upgrade mode when switching to last-state without HA previously enabled"); + return Optional.of(UpgradeMode.SAVEPOINT); + } else { + return Optional.of(upgradeMode); + } + } + + return Optional.empty(); + } + + protected void restoreJob( + ObjectMeta meta, + SPEC spec, + STATUS status, + Configuration deployConfig, + boolean requireHaMetadata) + throws Exception { + Optional<String> savepointOpt = Optional.empty(); + + if (spec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) { + savepointOpt = + Optional.ofNullable(status.getJobStatus().getSavepointInfo().getLastSavepoint()) + .flatMap(s -> Optional.ofNullable(s.getLocation())); + } + + deploy(meta, spec, status, deployConfig, savepointOpt, requireHaMetadata); + } + + @Override + protected void rollback(CR resource, Context context, Configuration observeConfig) + throws Exception { + var reconciliationStatus = resource.getStatus().getReconciliationStatus(); + var rollbackSpec = reconciliationStatus.deserializeLastStableSpec(); + + UpgradeMode upgradeMode = resource.getSpec().getJob().getUpgradeMode(); + + cancelJob( + resource, + upgradeMode == UpgradeMode.STATELESS + ? UpgradeMode.STATELESS + : UpgradeMode.LAST_STATE, + observeConfig); + + restoreJob( + resource.getMetadata(), + rollbackSpec, + resource.getStatus(), + getDeployConfig(resource.getMetadata(), rollbackSpec, context), + upgradeMode != UpgradeMode.STATELESS); + + reconciliationStatus.setState(ReconciliationState.ROLLED_BACK); + } + + @Override + public boolean reconcileOtherChanges(CR resource, Configuration observeConfig) + throws Exception { + return SavepointUtils.triggerSavepointIfNeeded(flinkService, resource, observeConfig); + } + + /** + * Cancel the job for the given resource using the specified upgrade mode. + * + * @param resource Related Flink resource. + * @param upgradeMode + * @param observeConfig Observe configuration. + * @throws Exception + */ + protected abstract void cancelJob( + CR resource, UpgradeMode upgradeMode, Configuration observeConfig) throws Exception; +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java index 595b7ef..b89a88b 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java @@ -17,34 +17,28 @@ package org.apache.flink.kubernetes.operator.reconciler.deployment; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; -import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec; -import org.apache.flink.kubernetes.operator.crd.spec.JobSpec; -import org.apache.flink.kubernetes.operator.crd.spec.JobState; import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus; -import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState; -import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus; import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; import org.apache.flink.kubernetes.operator.utils.IngressUtils; -import org.apache.flink.kubernetes.operator.utils.SavepointUtils; import org.apache.flink.runtime.highavailability.JobResultStoreOptions; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import io.fabric8.kubernetes.api.model.ObjectMeta; import io.fabric8.kubernetes.client.KubernetesClient; import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; import lombok.SneakyThrows; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,11 +46,9 @@ import org.slf4j.LoggerFactory; import java.util.Optional; import java.util.UUID; -/** - * Reconciler responsible for handling the job lifecycle according to the desired and current - * states. - */ -public class ApplicationReconciler extends AbstractDeploymentReconciler { +/** Reconciler Flink Application deployments. */ +public class ApplicationReconciler + extends AbstractJobReconciler<FlinkDeployment, FlinkDeploymentSpec, FlinkDeploymentStatus> { private static final Logger LOG = LoggerFactory.getLogger(ApplicationReconciler.class); @@ -69,166 +61,30 @@ public class ApplicationReconciler extends AbstractDeploymentReconciler { } @Override - public void reconcile(FlinkDeployment flinkApp, Context context) throws Exception { - ObjectMeta deployMeta = flinkApp.getMetadata(); - FlinkDeploymentStatus status = flinkApp.getStatus(); - ReconciliationStatus<FlinkDeploymentSpec> reconciliationStatus = - status.getReconciliationStatus(); - FlinkDeploymentSpec lastReconciledSpec = - reconciliationStatus.deserializeLastReconciledSpec(); - FlinkDeploymentSpec currentDeploySpec = flinkApp.getSpec(); - - JobSpec desiredJobSpec = currentDeploySpec.getJob(); - Configuration deployConfig = configManager.getDeployConfig(deployMeta, currentDeploySpec); - if (lastReconciledSpec == null) { - LOG.debug("Deploying application for the first time"); - deployFlinkJob( - deployMeta, - desiredJobSpec, - status, - deployConfig, - Optional.ofNullable(desiredJobSpec.getInitialSavepointPath()), - false); - IngressUtils.updateIngressRules( - deployMeta, currentDeploySpec, deployConfig, kubernetesClient); - ReconciliationUtils.updateForSpecReconciliationSuccess( - flinkApp, JobState.RUNNING, deployConfig); - return; - } - - if (!deployConfig.getBoolean( - KubernetesOperatorConfigOptions.JOB_UPGRADE_IGNORE_PENDING_SAVEPOINT) - && SavepointUtils.savepointInProgress(status.getJobStatus())) { - LOG.info("Delaying job reconciliation until pending savepoint is completed."); - return; - } - - Configuration observeConfig = configManager.getObserveConfig(flinkApp); - boolean specChanged = !currentDeploySpec.equals(lastReconciledSpec); - if (specChanged) { - if (newSpecIsAlreadyDeployed(flinkApp, deployConfig)) { - return; - } - LOG.debug("Detected spec change, starting upgrade process."); - JobState currentJobState = lastReconciledSpec.getJob().getState(); - JobState desiredJobState = desiredJobSpec.getState(); - JobState stateAfterReconcile = currentJobState; - if (currentJobState == JobState.RUNNING) { - if (desiredJobState == JobState.RUNNING) { - LOG.info("Upgrading/Restarting running job, suspending first..."); - } - Optional<UpgradeMode> availableUpgradeMode = - getAvailableUpgradeMode(flinkApp, deployConfig); - if (availableUpgradeMode.isEmpty()) { - return; - } - // We must record the upgrade mode used to the status later - desiredJobSpec.setUpgradeMode(availableUpgradeMode.get()); - flinkService.cancelJob(flinkApp, availableUpgradeMode.get()); - stateAfterReconcile = JobState.SUSPENDED; - } - if (currentJobState == JobState.SUSPENDED && desiredJobState == JobState.RUNNING) { - restoreJob( - deployMeta, - desiredJobSpec, - status, - deployConfig, - // We decide to enforce HA based on how job was previously suspended - lastReconciledSpec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE); - stateAfterReconcile = JobState.RUNNING; - } - ReconciliationUtils.updateForSpecReconciliationSuccess( - flinkApp, stateAfterReconcile, deployConfig); - IngressUtils.updateIngressRules( - deployMeta, currentDeploySpec, deployConfig, kubernetesClient); - } else if (ReconciliationUtils.shouldRollBack( - flinkService, reconciliationStatus, observeConfig)) { - rollbackApplication(flinkApp); - } else if (ReconciliationUtils.shouldRecoverDeployment(observeConfig, flinkApp)) { - recoverJmDeployment(flinkApp, observeConfig); - } else { - if (!SavepointUtils.triggerSavepointIfNeeded(flinkService, flinkApp, observeConfig)) { - LOG.info("Deployment is fully reconciled, nothing to do."); - } - } + protected Configuration getObserveConfig(FlinkDeployment deployment, Context context) { + return configManager.getObserveConfig(deployment); } - private void rollbackApplication(FlinkDeployment flinkApp) throws Exception { - ReconciliationStatus<FlinkDeploymentSpec> reconciliationStatus = - flinkApp.getStatus().getReconciliationStatus(); - - if (initiateRollBack(flinkApp.getStatus())) { - return; - } - - LOG.warn("Executing rollback operation"); - FlinkDeploymentSpec rollbackSpec = reconciliationStatus.deserializeLastStableSpec(); - Configuration rollbackConfig = - configManager.getDeployConfig(flinkApp.getMetadata(), rollbackSpec); - UpgradeMode upgradeMode = flinkApp.getSpec().getJob().getUpgradeMode(); - - flinkService.cancelJob( - flinkApp, - upgradeMode == UpgradeMode.STATELESS - ? UpgradeMode.STATELESS - : UpgradeMode.LAST_STATE); - - restoreJob( - flinkApp.getMetadata(), - rollbackSpec.getJob(), - flinkApp.getStatus(), - rollbackConfig, - upgradeMode != UpgradeMode.STATELESS); - - reconciliationStatus.setState(ReconciliationState.ROLLED_BACK); - IngressUtils.updateIngressRules( - flinkApp.getMetadata(), rollbackSpec, rollbackConfig, kubernetesClient); + @Override + protected Configuration getDeployConfig( + ObjectMeta deployMeta, FlinkDeploymentSpec currentDeploySpec, Context context) { + return configManager.getDeployConfig(deployMeta, currentDeploySpec); } - private void recoverJmDeployment(FlinkDeployment deployment, Configuration observeConfig) - throws Exception { - LOG.info("Missing Flink Cluster deployment, trying to recover..."); - FlinkDeploymentSpec specToRecover = ReconciliationUtils.getDeployedSpec(deployment); - restoreJob( - deployment.getMetadata(), - specToRecover.getJob(), - deployment.getStatus(), - observeConfig, - true); - } + @Override + protected Optional<UpgradeMode> getAvailableUpgradeMode( + FlinkDeployment deployment, Configuration deployConfig, Configuration observeConfig) { - private Optional<UpgradeMode> getAvailableUpgradeMode( - FlinkDeployment deployment, Configuration deployConfig) { var status = deployment.getStatus(); - var upgradeMode = deployment.getSpec().getJob().getUpgradeMode(); - var changedToLastStateWithoutHa = - ReconciliationUtils.isUpgradeModeChangedToLastStateAndHADisabledPreviously( - deployment, configManager); - - if (upgradeMode == UpgradeMode.STATELESS) { - LOG.info("Stateless job, ready for upgrade"); - return Optional.of(upgradeMode); - } - - if (ReconciliationUtils.isJobInTerminalState(status)) { - LOG.info( - "Job is in terminal state, ready for upgrade from observed latest checkpoint/savepoint"); - return Optional.of(UpgradeMode.SAVEPOINT); - } + var availableUpgradeMode = + super.getAvailableUpgradeMode(deployment, deployConfig, observeConfig); - if (ReconciliationUtils.isJobRunning(status)) { - LOG.info("Job is in running state, ready for upgrade with {}", upgradeMode); - if (changedToLastStateWithoutHa) { - LOG.info( - "Using savepoint upgrade mode when switching to last-state without HA previously enabled"); - return Optional.of(UpgradeMode.SAVEPOINT); - } else { - return Optional.of(upgradeMode); - } + if (availableUpgradeMode.isPresent()) { + return availableUpgradeMode; } if (FlinkUtils.isKubernetesHAActivated(deployConfig) - && FlinkUtils.isKubernetesHAActivated(configManager.getObserveConfig(deployment)) + && FlinkUtils.isKubernetesHAActivated(observeConfig) && flinkService.isHaMetadataAvailable(deployConfig)) { LOG.info( "Job is not running but HA metadata is available for last state restore, ready for upgrade"); @@ -242,17 +98,17 @@ public class ApplicationReconciler extends AbstractDeploymentReconciler { + "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. " + "Manual restore required.", "UpgradeFailed"); - } else { - LOG.info( - "Job is not running yet and HA metadata is not available, waiting for upgradeable state"); - return Optional.empty(); } + + LOG.info( + "Job is not running yet and HA metadata is not available, waiting for upgradeable state"); + return Optional.empty(); } - @VisibleForTesting - protected void deployFlinkJob( + @Override + protected void deploy( ObjectMeta meta, - JobSpec jobSpec, + FlinkDeploymentSpec spec, FlinkDeploymentStatus status, Configuration deployConfig, Optional<String> savepoint, @@ -282,27 +138,18 @@ public class ApplicationReconciler extends AbstractDeploymentReconciler { .getFlinkShutdownClusterTimeout() .toSeconds()); } - flinkService.submitApplicationCluster(jobSpec, deployConfig, requireHaMetadata); + flinkService.submitApplicationCluster(spec.getJob(), deployConfig, requireHaMetadata); status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING.name()); status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING); + + IngressUtils.updateIngressRules(meta, spec, deployConfig, kubernetesClient); } - private void restoreJob( - ObjectMeta meta, - JobSpec jobSpec, - FlinkDeploymentStatus status, - Configuration deployConfig, - boolean requireHaMetadata) + @Override + protected void cancelJob( + FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration observeConfig) throws Exception { - Optional<String> savepointOpt = Optional.empty(); - - if (jobSpec.getUpgradeMode() != UpgradeMode.STATELESS) { - savepointOpt = - Optional.ofNullable(status.getJobStatus().getSavepointInfo().getLastSavepoint()) - .flatMap(s -> Optional.ofNullable(s.getLocation())); - } - - deployFlinkJob(meta, jobSpec, status, deployConfig, savepointOpt, requireHaMetadata); + flinkService.cancelJob(deployment, upgradeMode, observeConfig); } // Workaround for https://issues.apache.org/jira/browse/FLINK-27569 @@ -323,14 +170,42 @@ public class ApplicationReconciler extends AbstractDeploymentReconciler { } } + @Override + public boolean reconcileOtherChanges(FlinkDeployment deployment, Configuration observeConfig) + throws Exception { + if (super.reconcileOtherChanges(deployment, observeConfig)) { + return true; + } + + if (shouldRecoverDeployment(observeConfig, deployment)) { + recoverJmDeployment(deployment, observeConfig); + return true; + } + return false; + } + + private void recoverJmDeployment(FlinkDeployment deployment, Configuration observeConfig) + throws Exception { + LOG.info("Missing Flink Cluster deployment, trying to recover..."); + FlinkDeploymentSpec specToRecover = ReconciliationUtils.getDeployedSpec(deployment); + restoreJob( + deployment.getMetadata(), + specToRecover, + deployment.getStatus(), + observeConfig, + true); + } + @Override @SneakyThrows - protected void shutdown(FlinkDeployment flinkApp) { - var status = flinkApp.getStatus(); + protected DeleteControl cleanupInternal(FlinkDeployment deployment, Context context) { + var status = deployment.getStatus(); if (status.getReconciliationStatus().getLastReconciledSpec() == null) { - flinkService.deleteClusterDeployment(flinkApp.getMetadata(), status, true); + flinkService.deleteClusterDeployment(deployment.getMetadata(), status, true); } else { - flinkService.cancelJob(flinkApp, UpgradeMode.STATELESS); + flinkService.cancelJob( + deployment, UpgradeMode.STATELESS, configManager.getObserveConfig(deployment)); } + return DeleteControl.defaultDelete(); } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java index ccf74f5..ba8d390 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java @@ -33,12 +33,14 @@ import org.apache.flink.kubernetes.operator.utils.EventUtils; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; import org.apache.flink.kubernetes.operator.utils.IngressUtils; +import io.fabric8.kubernetes.api.model.ObjectMeta; import io.fabric8.kubernetes.client.KubernetesClient; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -46,7 +48,9 @@ import java.util.stream.Collectors; * Reconciler responsible for handling the session cluster lifecycle according to the desired and * current states. */ -public class SessionReconciler extends AbstractDeploymentReconciler { +public class SessionReconciler + extends AbstractFlinkResourceReconciler< + FlinkDeployment, FlinkDeploymentSpec, FlinkDeploymentStatus> { private static final Logger LOG = LoggerFactory.getLogger(SessionReconciler.class); @@ -59,42 +63,28 @@ public class SessionReconciler extends AbstractDeploymentReconciler { } @Override - public void reconcile(FlinkDeployment flinkApp, Context context) throws Exception { - FlinkDeploymentStatus status = flinkApp.getStatus(); - ReconciliationStatus<FlinkDeploymentSpec> reconciliationStatus = - status.getReconciliationStatus(); - FlinkDeploymentSpec lastReconciledSpec = - reconciliationStatus.deserializeLastReconciledSpec(); - FlinkDeploymentSpec currentDeploySpec = flinkApp.getSpec(); - - if (lastReconciledSpec == null) { - Configuration conf = - configManager.getDeployConfig(flinkApp.getMetadata(), currentDeploySpec); - flinkService.submitSessionCluster(conf); - status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING); - IngressUtils.updateIngressRules( - flinkApp.getMetadata(), currentDeploySpec, conf, kubernetesClient); - ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp, null, conf); - return; - } + protected Configuration getDeployConfig( + ObjectMeta meta, FlinkDeploymentSpec spec, Context ctx) { + return configManager.getDeployConfig(meta, spec); + } - Configuration observeConfig = configManager.getObserveConfig(flinkApp); - boolean specChanged = !currentDeploySpec.equals(lastReconciledSpec); - if (specChanged) { - var deployConf = - configManager.getDeployConfig(flinkApp.getMetadata(), currentDeploySpec); - if (newSpecIsAlreadyDeployed(flinkApp, deployConf)) { - return; - } - LOG.debug("Detected spec change, starting upgrade process."); - upgradeSessionCluster(flinkApp, currentDeploySpec, deployConf); - ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp, null, deployConf); - } else if (ReconciliationUtils.shouldRollBack( - flinkService, reconciliationStatus, observeConfig)) { - rollbackSessionCluster(flinkApp); - } else if (ReconciliationUtils.shouldRecoverDeployment(observeConfig, flinkApp)) { - recoverSession(flinkApp, observeConfig); - } + @Override + protected Configuration getObserveConfig(FlinkDeployment resource, Context context) { + return configManager.getObserveConfig(resource); + } + + @Override + protected boolean readyToReconcile( + FlinkDeployment deployment, Context ctx, Configuration deployConfig) { + return true; + } + + @Override + protected void reconcileSpecChange( + FlinkDeployment deployment, Configuration observeConfig, Configuration deployConfig) + throws Exception { + upgradeSessionCluster(deployment, deployment.getSpec(), deployConfig); + ReconciliationUtils.updateForSpecReconciliationSuccess(deployment, null, deployConfig); } private void upgradeSessionCluster( @@ -112,24 +102,33 @@ public class SessionReconciler extends AbstractDeploymentReconciler { .getOperatorConfiguration() .getFlinkShutdownClusterTimeout() .toSeconds()); - flinkService.submitSessionCluster(effectiveConfig); - deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING); - IngressUtils.updateIngressRules( - deployment.getMetadata(), deploySpec, effectiveConfig, kubernetesClient); + deploy( + deployment.getMetadata(), + deploySpec, + deployment.getStatus(), + effectiveConfig, + Optional.empty(), + false); } - private void recoverSession(FlinkDeployment deployment, Configuration effectiveConfig) + @Override + protected void deploy( + ObjectMeta meta, + FlinkDeploymentSpec spec, + FlinkDeploymentStatus status, + Configuration deployConfig, + Optional<String> savepoint, + boolean requireHaMetadata) throws Exception { - flinkService.submitSessionCluster(effectiveConfig); - deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING); + flinkService.submitSessionCluster(deployConfig); + status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING); + IngressUtils.updateIngressRules(meta, spec, deployConfig, kubernetesClient); } - private void rollbackSessionCluster(FlinkDeployment deployment) throws Exception { + @Override + protected void rollback(FlinkDeployment deployment, Context ctx, Configuration observeConfig) + throws Exception { FlinkDeploymentStatus status = deployment.getStatus(); - if (initiateRollBack(status)) { - return; - } - ReconciliationStatus<FlinkDeploymentSpec> reconciliationStatus = status.getReconciliationStatus(); FlinkDeploymentSpec rollbackSpec = reconciliationStatus.deserializeLastStableSpec(); @@ -140,15 +139,23 @@ public class SessionReconciler extends AbstractDeploymentReconciler { } @Override - protected void shutdown(FlinkDeployment deployment) { - LOG.info("Stopping session cluster"); - flinkService.deleteClusterDeployment( - deployment.getMetadata(), deployment.getStatus(), true); + public boolean reconcileOtherChanges(FlinkDeployment flinkApp, Configuration observeConfig) + throws Exception { + if (shouldRecoverDeployment(observeConfig, flinkApp)) { + recoverSession(flinkApp, observeConfig); + return true; + } + return false; } - @Override - public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) { + private void recoverSession(FlinkDeployment deployment, Configuration effectiveConfig) + throws Exception { + flinkService.submitSessionCluster(effectiveConfig); + deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING); + } + @Override + public DeleteControl cleanupInternal(FlinkDeployment deployment, Context context) { Set<FlinkSessionJob> sessionJobs = context.getSecondaryResources(FlinkSessionJob.class); if (!sessionJobs.isEmpty()) { var error = @@ -158,7 +165,7 @@ public class SessionReconciler extends AbstractDeploymentReconciler { .map(job -> job.getMetadata().getName()) .collect(Collectors.toList())); if (eventRecorder.triggerEvent( - flinkApp, + deployment, EventUtils.Type.Warning, "Cleanup", error, @@ -172,7 +179,10 @@ public class SessionReconciler extends AbstractDeploymentReconciler { .getReconcileInterval() .toMillis()); } else { - return super.cleanup(flinkApp, context); + LOG.info("Stopping session cluster"); + flinkService.deleteClusterDeployment( + deployment.getMetadata(), deployment.getStatus(), true); + return DeleteControl.defaultDelete(); } } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java deleted file mode 100644 index 0079d80..0000000 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.kubernetes.operator.reconciler.sessionjob; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; -import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; -import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; -import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob; -import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec; -import org.apache.flink.kubernetes.operator.crd.spec.JobState; -import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; -import org.apache.flink.kubernetes.operator.crd.status.JobStatus; -import org.apache.flink.kubernetes.operator.crd.status.Savepoint; -import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType; -import org.apache.flink.kubernetes.operator.reconciler.Reconciler; -import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; -import org.apache.flink.kubernetes.operator.service.FlinkService; -import org.apache.flink.kubernetes.operator.utils.SavepointUtils; -import org.apache.flink.util.Preconditions; - -import io.fabric8.kubernetes.client.KubernetesClient; -import io.javaoperatorsdk.operator.api.reconciler.Context; -import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; - -import java.util.Optional; - -/** The reconciler for the {@link FlinkSessionJob}. */ -public class FlinkSessionJobReconciler implements Reconciler<FlinkSessionJob> { - - private static final Logger LOG = LoggerFactory.getLogger(FlinkSessionJobReconciler.class); - - private final FlinkConfigManager configManager; - private final KubernetesClient kubernetesClient; - private final FlinkService flinkService; - - public FlinkSessionJobReconciler( - KubernetesClient kubernetesClient, - FlinkService flinkService, - FlinkConfigManager configManager) { - this.kubernetesClient = kubernetesClient; - this.flinkService = flinkService; - this.configManager = configManager; - } - - @Override - public void reconcile(FlinkSessionJob flinkSessionJob, Context context) throws Exception { - - SessionJobHelper helper = new SessionJobHelper(flinkSessionJob, LOG); - FlinkSessionJobSpec lastReconciledSpec = - flinkSessionJob - .getStatus() - .getReconciliationStatus() - .deserializeLastReconciledSpec(); - - Optional<FlinkDeployment> flinkDepOptional = - context.getSecondaryResource(FlinkDeployment.class); - - // if session cluster is not ready, we can't do reconcile for the job. - if (!helper.sessionClusterReady(flinkDepOptional)) { - return; - } - - Configuration sessionJobConfig = - configManager.getSessionJobConfig(flinkDepOptional.get(), flinkSessionJob); - - if (lastReconciledSpec == null) { - submitAndInitStatus( - flinkSessionJob, - sessionJobConfig, - Optional.ofNullable( - flinkSessionJob.getSpec().getJob().getInitialSavepointPath()) - .orElse(null)); - ReconciliationUtils.updateForSpecReconciliationSuccess( - flinkSessionJob, JobState.RUNNING, sessionJobConfig); - return; - } - - if (!sessionJobConfig.getBoolean( - KubernetesOperatorConfigOptions.JOB_UPGRADE_IGNORE_PENDING_SAVEPOINT) - && SavepointUtils.savepointInProgress(flinkSessionJob.getStatus().getJobStatus())) { - LOG.info("Delaying job reconciliation until pending savepoint is completed"); - return; - } - - boolean specChanged = helper.specChanged(lastReconciledSpec); - var jobSpec = flinkSessionJob.getSpec().getJob(); - - if (specChanged) { - JobState currentJobState = lastReconciledSpec.getJob().getState(); - JobState desiredJobState = jobSpec.getState(); - UpgradeMode upgradeMode = jobSpec.getUpgradeMode(); - JobState stateAfterReconcile = currentJobState; - if (currentJobState == JobState.RUNNING) { - if (desiredJobState == JobState.RUNNING) { - LOG.info("Upgrading/Restarting running job, suspending first..."); - } - stateAfterReconcile = suspendJob(flinkSessionJob, upgradeMode, sessionJobConfig); - } - if (currentJobState == JobState.SUSPENDED && desiredJobState == JobState.RUNNING) { - if (upgradeMode == UpgradeMode.STATELESS) { - submitAndInitStatus(flinkSessionJob, sessionJobConfig, null); - } else if (upgradeMode == UpgradeMode.LAST_STATE - || upgradeMode == UpgradeMode.SAVEPOINT) { - restoreFromLastSavepoint(flinkSessionJob, sessionJobConfig); - } - stateAfterReconcile = JobState.RUNNING; - } - ReconciliationUtils.updateForSpecReconciliationSuccess( - flinkSessionJob, stateAfterReconcile, sessionJobConfig); - } else { - if (!SavepointUtils.triggerSavepointIfNeeded( - flinkService, flinkSessionJob, sessionJobConfig)) { - LOG.info("Session Job is fully reconciled, nothing to do."); - } - } - } - - @Override - public DeleteControl cleanup(FlinkSessionJob sessionJob, Context context) { - Optional<FlinkDeployment> flinkDepOptional = - context.getSecondaryResource(FlinkDeployment.class); - - if (flinkDepOptional.isPresent()) { - String jobID = sessionJob.getStatus().getJobStatus().getJobId(); - if (jobID != null) { - try { - flinkService.cancelSessionJob( - JobID.fromHexString(jobID), - UpgradeMode.STATELESS, - configManager.getSessionJobConfig(flinkDepOptional.get(), sessionJob)); - } catch (Exception e) { - LOG.error("Failed to cancel job.", e); - } - } - } else { - LOG.info("Session cluster deployment not available"); - } - return DeleteControl.defaultDelete(); - } - - private void submitAndInitStatus( - FlinkSessionJob sessionJob, Configuration effectiveConfig, @Nullable String savepoint) - throws Exception { - var jobID = flinkService.submitJobToSessionCluster(sessionJob, effectiveConfig, savepoint); - sessionJob - .getStatus() - .setJobStatus( - new JobStatus() - .toBuilder() - .jobId(jobID.toHexString()) - .state(org.apache.flink.api.common.JobStatus.RECONCILING.name()) - .build()); - } - - private void restoreFromLastSavepoint( - FlinkSessionJob flinkSessionJob, Configuration effectiveConfig) throws Exception { - JobStatus jobStatus = flinkSessionJob.getStatus().getJobStatus(); - Optional<String> savepointOpt = - Optional.ofNullable(jobStatus.getSavepointInfo().getLastSavepoint()) - .flatMap(s -> Optional.ofNullable(s.getLocation())); - - submitAndInitStatus(flinkSessionJob, effectiveConfig, savepointOpt.orElse(null)); - } - - private Optional<String> internalSuspendJob( - FlinkSessionJob sessionJob, UpgradeMode upgradeMode, Configuration effectiveConfig) - throws Exception { - final String jobIdString = sessionJob.getStatus().getJobStatus().getJobId(); - Preconditions.checkNotNull(jobIdString, "The job to be suspend should not be null"); - return flinkService.cancelSessionJob( - JobID.fromHexString(jobIdString), upgradeMode, effectiveConfig); - } - - private JobState suspendJob( - FlinkSessionJob sessionJob, UpgradeMode upgradeMode, Configuration effectiveConfig) - throws Exception { - final Optional<String> savepointOpt = - internalSuspendJob(sessionJob, upgradeMode, effectiveConfig); - - JobStatus jobStatus = sessionJob.getStatus().getJobStatus(); - JobState stateAfterReconcile = JobState.SUSPENDED; - jobStatus.setState(stateAfterReconcile.name()); - savepointOpt.ifPresent( - location -> { - Savepoint sp = Savepoint.of(location, SavepointTriggerType.UPGRADE); - jobStatus.getSavepointInfo().updateLastSavepoint(sp); - }); - return stateAfterReconcile; - } -} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobHelper.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobHelper.java deleted file mode 100644 index a1f36c2..0000000 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobHelper.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.kubernetes.operator.reconciler.sessionjob; - -import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; -import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob; -import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec; -import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus; - -import org.slf4j.Logger; - -import java.util.Optional; - -/** A tool for session job management condition checker. */ -public class SessionJobHelper { - - private final Logger logger; - private final FlinkSessionJob sessionJob; - - public SessionJobHelper(FlinkSessionJob sessionJob, Logger logger) { - this.sessionJob = sessionJob; - this.logger = logger; - } - - public boolean specChanged(FlinkSessionJobSpec lastReconciledSpec) { - return !sessionJob.getSpec().equals(lastReconciledSpec); - } - - public boolean sessionClusterReady(Optional<FlinkDeployment> flinkDeploymentOpt) { - if (flinkDeploymentOpt.isPresent()) { - var flinkdep = flinkDeploymentOpt.get(); - var jobmanagerDeploymentStatus = flinkdep.getStatus().getJobManagerDeploymentStatus(); - if (jobmanagerDeploymentStatus != JobManagerDeploymentStatus.READY) { - logger.info( - "Session cluster deployment is in {} status, not ready for serve", - jobmanagerDeploymentStatus); - return false; - } else { - return true; - } - } else { - logger.warn("Session cluster deployment is not found"); - return false; - } - } -} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java new file mode 100644 index 0000000..7757500 --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.reconciler.sessionjob; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; +import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; +import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob; +import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec; +import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; +import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus; +import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus; +import org.apache.flink.kubernetes.operator.crd.status.JobStatus; +import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler; +import org.apache.flink.kubernetes.operator.service.FlinkService; +import org.apache.flink.kubernetes.operator.utils.EventRecorder; + +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; + +/** The reconciler for the {@link FlinkSessionJob}. */ +public class SessionJobReconciler + extends AbstractJobReconciler<FlinkSessionJob, FlinkSessionJobSpec, FlinkSessionJobStatus> { + + private static final Logger LOG = LoggerFactory.getLogger(SessionJobReconciler.class); + + public SessionJobReconciler( + KubernetesClient kubernetesClient, + FlinkService flinkService, + FlinkConfigManager configManager, + EventRecorder eventRecorder) { + super(kubernetesClient, flinkService, configManager, eventRecorder); + } + + @Override + protected Configuration getObserveConfig(FlinkSessionJob sessionJob, Context context) { + return getDeployConfig(sessionJob.getMetadata(), sessionJob.getSpec(), context); + } + + @Override + protected Configuration getDeployConfig( + ObjectMeta deployMeta, FlinkSessionJobSpec currentDeploySpec, Context context) { + Optional<FlinkDeployment> deploymentOpt = + context.getSecondaryResource(FlinkDeployment.class); + + if (!sessionClusterReady(deploymentOpt)) { + return null; + } + return configManager.getSessionJobConfig(deploymentOpt.get(), currentDeploySpec); + } + + @Override + public boolean readyToReconcile( + FlinkSessionJob flinkSessionJob, Context context, Configuration deployConfig) { + return sessionClusterReady(context.getSecondaryResource(FlinkDeployment.class)) + && super.readyToReconcile(flinkSessionJob, context, deployConfig); + } + + @Override + protected void deploy( + ObjectMeta meta, + FlinkSessionJobSpec sessionJobSpec, + FlinkSessionJobStatus status, + Configuration deployConfig, + Optional<String> savepoint, + boolean requireHaMetadata) + throws Exception { + var jobID = + flinkService.submitJobToSessionCluster( + meta, sessionJobSpec, deployConfig, savepoint.orElse(null)); + status.setJobStatus( + new JobStatus() + .toBuilder() + .jobId(jobID.toHexString()) + .state(org.apache.flink.api.common.JobStatus.RECONCILING.name()) + .build()); + } + + @Override + protected void cancelJob( + FlinkSessionJob resource, UpgradeMode upgradeMode, Configuration observeConfig) + throws Exception { + flinkService.cancelSessionJob(resource, upgradeMode, observeConfig); + } + + @Override + public DeleteControl cleanupInternal(FlinkSessionJob sessionJob, Context context) { + Optional<FlinkDeployment> flinkDepOptional = + context.getSecondaryResource(FlinkDeployment.class); + + if (flinkDepOptional.isPresent()) { + String jobID = sessionJob.getStatus().getJobStatus().getJobId(); + if (jobID != null) { + try { + cancelJob( + sessionJob, + UpgradeMode.STATELESS, + getObserveConfig(sessionJob, context)); + } catch (Exception e) { + LOG.error("Failed to cancel job.", e); + } + } + } else { + LOG.info("Session cluster deployment not available"); + } + return DeleteControl.defaultDelete(); + } + + public static boolean sessionClusterReady(Optional<FlinkDeployment> flinkDeploymentOpt) { + if (flinkDeploymentOpt.isPresent()) { + var flinkdep = flinkDeploymentOpt.get(); + var jobmanagerDeploymentStatus = flinkdep.getStatus().getJobManagerDeploymentStatus(); + if (jobmanagerDeploymentStatus != JobManagerDeploymentStatus.READY) { + LOG.info( + "Session cluster deployment is in {} status, not ready for serve", + jobmanagerDeploymentStatus); + return false; + } else { + return true; + } + } else { + LOG.warn("Session cluster deployment is not found"); + return false; + } + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java index 2ba73e0..e2d2994 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java @@ -39,6 +39,7 @@ import org.apache.flink.kubernetes.operator.artifact.ArtifactManager; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob; +import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec; import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion; import org.apache.flink.kubernetes.operator.crd.spec.JobSpec; import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; @@ -197,19 +198,20 @@ public class FlinkService { } public JobID submitJobToSessionCluster( - FlinkSessionJob sessionJob, Configuration conf, @Nullable String savepoint) + ObjectMeta meta, + FlinkSessionJobSpec spec, + Configuration conf, + @Nullable String savepoint) throws Exception { - var jarRunResponseBody = runJar(sessionJob, uploadJar(sessionJob, conf), conf, savepoint); + var jarRunResponseBody = + runJar(spec.getJob(), uploadJar(meta, spec, conf), conf, savepoint); var jobID = jarRunResponseBody.getJobId(); LOG.info("Submitted job: {} to session cluster.", jobID); return jobID; } private JarRunResponseBody runJar( - FlinkSessionJob sessionJob, - JarUploadResponseBody response, - Configuration conf, - String savepoint) { + JobSpec job, JarUploadResponseBody response, Configuration conf, String savepoint) { String jarId = response.getFilename().substring(response.getFilename().lastIndexOf("/") + 1); // we generate jobID in advance to help deduplicate job submission. @@ -219,7 +221,6 @@ public class FlinkService { JarRunHeaders headers = JarRunHeaders.getInstance(); JarRunMessageParameters parameters = headers.getUnresolvedMessageParameters(); parameters.jarIdPathParameter.resolve(jarId); - JobSpec job = sessionJob.getSpec().getJob(); JarRunRequestBody runRequestBody = new JarRunRequestBody( job.getEntryClass(), @@ -249,17 +250,16 @@ public class FlinkService { } } - private JarUploadResponseBody uploadJar(FlinkSessionJob sessionJob, Configuration conf) - throws Exception { - String targetDir = artifactManager.generateJarDir(sessionJob); - File jarFile = - artifactManager.fetch(sessionJob.getSpec().getJob().getJarURI(), conf, targetDir); + private JarUploadResponseBody uploadJar( + ObjectMeta objectMeta, FlinkSessionJobSpec spec, Configuration conf) throws Exception { + String targetDir = artifactManager.generateJarDir(objectMeta, spec); + File jarFile = artifactManager.fetch(spec.getJob().getJarURI(), conf, targetDir); Preconditions.checkArgument( jarFile.exists(), String.format("The jar file %s not exists", jarFile.getAbsolutePath())); JarUploadHeaders headers = JarUploadHeaders.getInstance(); - String clusterId = sessionJob.getSpec().getDeploymentName(); - String namespace = sessionJob.getMetadata().getNamespace(); + String clusterId = spec.getDeploymentName(); + String namespace = objectMeta.getNamespace(); int port = conf.getInteger(RestOptions.PORT); String host = ObjectUtils.firstNonNull( @@ -370,8 +370,8 @@ public class FlinkService { config, clusterId, (c, e) -> new StandaloneClientHAServices(restServerAddress)); } - public void cancelJob(FlinkDeployment deployment, UpgradeMode upgradeMode) throws Exception { - var conf = configManager.getObserveConfig(deployment); + public void cancelJob(FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration conf) + throws Exception { var deploymentStatus = deployment.getStatus(); var jobIdString = deploymentStatus.getJobStatus().getJobId(); var jobId = jobIdString != null ? JobID.fromHexString(jobIdString) : null; @@ -489,8 +489,15 @@ public class FlinkService { .toSeconds()); } - public Optional<String> cancelSessionJob( - JobID jobID, UpgradeMode upgradeMode, Configuration conf) throws Exception { + public void cancelSessionJob( + FlinkSessionJob sessionJob, UpgradeMode upgradeMode, Configuration conf) + throws Exception { + + var jobStatus = sessionJob.getStatus().getJobStatus(); + var jobIdString = jobStatus.getJobId(); + Preconditions.checkNotNull(jobIdString, "The job to be suspend should not be null"); + var jobId = JobID.fromHexString(jobIdString); + Optional<String> savepointOpt = Optional.empty(); try (ClusterClient<String> clusterClient = getClusterClient(conf)) { final String clusterId = clusterClient.getClusterId(); @@ -498,7 +505,7 @@ public class FlinkService { case STATELESS: LOG.info("Cancelling job."); clusterClient - .cancel(jobID) + .cancel(jobId) .get( configManager .getOperatorConfiguration() @@ -519,7 +526,7 @@ public class FlinkService { String savepoint = clusterClient .stopWithSavepoint( - jobID, + jobId, false, savepointDirectory, conf.get(FLINK_VERSION) @@ -535,7 +542,7 @@ public class FlinkService { String.format( "Timed out stopping the job %s in Flink cluster %s with savepoint, " + "please configure a larger timeout via '%s'", - jobID, + jobId, clusterId, ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.key()), exception); @@ -546,7 +553,13 @@ public class FlinkService { throw new RuntimeException("Unsupported upgrade mode " + upgradeMode); } } - return savepointOpt; + + jobStatus.setState(JobStatus.FINISHED.name()); + savepointOpt.ifPresent( + location -> { + Savepoint sp = Savepoint.of(location, SavepointTriggerType.UPGRADE); + jobStatus.getSavepointInfo().updateLastSavepoint(sp); + }); } public void triggerSavepoint( diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java index 8448b3d..012d789 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java @@ -318,7 +318,7 @@ public class DefaultValidator implements FlinkResourceValidator { && deployment.getStatus().getJobManagerDeploymentStatus() != JobManagerDeploymentStatus.MISSING && ReconciliationUtils.isUpgradeModeChangedToLastStateAndHADisabledPreviously( - deployment, configManager)) { + deployment, configManager.getObserveConfig(deployment))) { return Optional.of( String.format( "Job could not be upgraded to last-state while config key[%s] is not set", diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java index c1b4e8e..0e29516 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java @@ -28,10 +28,9 @@ import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget; import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; -import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob; +import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec; import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion; import org.apache.flink.kubernetes.operator.crd.spec.JobSpec; -import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus; import org.apache.flink.kubernetes.operator.crd.status.Savepoint; @@ -85,7 +84,6 @@ public class TestingFlinkService extends FlinkService { private final List<Tuple2<String, JobStatusMessage>> jobs = new ArrayList<>(); private final Map<JobID, String> jobErrors = new HashMap<>(); - private final Map<JobID, SubmittedJobInfo> sessionJobs = new HashMap<>(); private final Set<String> sessions = new HashSet<>(); private boolean isPortReady = true; private boolean haDataAvailable = true; @@ -138,7 +136,6 @@ public class TestingFlinkService extends FlinkService { public void clear() { jobs.clear(); sessions.clear(); - sessionJobs.clear(); } @Override @@ -185,16 +182,24 @@ public class TestingFlinkService extends FlinkService { @Override public JobID submitJobToSessionCluster( - FlinkSessionJob sessionJob, Configuration conf, @Nullable String savepoint) { + ObjectMeta meta, + FlinkSessionJobSpec spec, + Configuration conf, + @Nullable String savepoint) + throws Exception { + + if (deployFailure) { + throw new Exception("Deployment failure"); + } JobID jobID = new JobID(); JobStatusMessage jobStatusMessage = new JobStatusMessage( jobID, - sessionJob.getMetadata().getName(), + conf.getString(KubernetesConfigOptions.CLUSTER_ID), JobStatus.RUNNING, System.currentTimeMillis()); - sessionJob.getStatus().getJobStatus().setJobId(jobID.toHexString()); - sessionJobs.put(jobID, new SubmittedJobInfo(savepoint, jobStatusMessage, conf)); + + jobs.add(Tuple2.of(savepoint, jobStatusMessage)); return jobID; } @@ -218,24 +223,6 @@ public class TestingFlinkService extends FlinkService { return jobs.stream().filter(t -> !t.f1.getJobState().isTerminalState()).count(); } - public Map<JobID, SubmittedJobInfo> listSessionJobs() { - return new HashMap<>(sessionJobs); - } - - @Override - public Optional<String> cancelSessionJob( - JobID jobID, UpgradeMode upgradeMode, Configuration conf) throws Exception { - if (sessionJobs.remove(jobID) == null) { - throw new Exception("Job not found"); - } - - if (upgradeMode == UpgradeMode.SAVEPOINT) { - return Optional.of("savepoint_" + savepointCounter++); - } else { - return Optional.empty(); - } - } - @Override public void triggerSavepoint( String jobId, @@ -275,12 +262,8 @@ public class TestingFlinkService extends FlinkService { .equals(KubernetesDeploymentTarget.APPLICATION.getName())) { throw new RuntimeException("Trying to list a job without submitting it"); } - var lists = jobs.stream().map(t -> t.f1).collect(Collectors.toList()); - lists.addAll( - sessionJobs.values().stream() - .map(t -> t.jobStatusMessage) - .collect(Collectors.toList())); - return CompletableFuture.completedFuture(lists); + return CompletableFuture.completedFuture( + jobs.stream().map(t -> t.f1).collect(Collectors.toList())); }); clusterClient.setStopWithSavepointFunction( diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/artifact/ArtifactManagerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/artifact/ArtifactManagerTest.java index 380d66d..719420c 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/artifact/ArtifactManagerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/artifact/ArtifactManagerTest.java @@ -62,7 +62,9 @@ public class ArtifactManagerTest { @Test public void testGenerateJarDir() { - String baseDir = artifactManager.generateJarDir(TestUtils.buildSessionJob()); + var sessionJob = TestUtils.buildSessionJob(); + String baseDir = + artifactManager.generateJarDir(sessionJob.getMetadata(), sessionJob.getSpec()); String expected = tempDir.toString() + File.separator diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java index 9f4a315..a73e000 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java @@ -27,7 +27,7 @@ import org.apache.flink.kubernetes.operator.TestingStatusRecorder; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus; import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType; -import org.apache.flink.kubernetes.operator.reconciler.sessionjob.FlinkSessionJobReconciler; +import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.kubernetes.operator.utils.SavepointUtils; @@ -47,7 +47,7 @@ public class SessionJobObserverTest { private final FlinkConfigManager configManager = new FlinkConfigManager(new Configuration()); private final TestingFlinkService flinkService = new TestingFlinkService(); private SessionJobObserver observer; - private FlinkSessionJobReconciler reconciler; + private SessionJobReconciler reconciler; @BeforeEach public void before() { @@ -55,7 +55,9 @@ public class SessionJobObserverTest { TestingStatusRecorder<FlinkSessionJobStatus> statusRecorder = new TestingStatusRecorder<>(); observer = new SessionJobObserver(flinkService, configManager, statusRecorder, eventRecorder); - reconciler = new FlinkSessionJobReconciler(kubernetesClient, flinkService, configManager); + reconciler = + new SessionJobReconciler( + kubernetesClient, flinkService, configManager, eventRecorder); } @Test diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java index ca9d819..18c847c 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java @@ -27,7 +27,6 @@ import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptio import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec; import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion; -import org.apache.flink.kubernetes.operator.crd.spec.JobSpec; import org.apache.flink.kubernetes.operator.crd.spec.JobState; import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; @@ -489,21 +488,18 @@ public class ApplicationReconcilerTest { ObjectMeta deployMeta = flinkApp.getMetadata(); FlinkDeploymentStatus status = flinkApp.getStatus(); FlinkDeploymentSpec spec = flinkApp.getSpec(); - JobSpec jobSpec = spec.getJob(); Configuration deployConfig = configManager.getDeployConfig(deployMeta, spec); status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name()); status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY); - reconciler.deployFlinkJob( - deployMeta, jobSpec, status, deployConfig, Optional.empty(), false); + reconciler.deploy(deployMeta, spec, status, deployConfig, Optional.empty(), false); String path1 = deployConfig.get(JobResultStoreOptions.STORAGE_PATH); Assertions.assertTrue(path1.startsWith(haStoragePath)); status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name()); status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY); - reconciler.deployFlinkJob( - deployMeta, jobSpec, status, deployConfig, Optional.empty(), false); + reconciler.deploy(deployMeta, spec, status, deployConfig, Optional.empty(), false); String path2 = deployConfig.get(JobResultStoreOptions.STORAGE_PATH); Assertions.assertTrue(path2.startsWith(haStoragePath)); assertNotEquals(path1, path2); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java similarity index 78% rename from flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconcilerTest.java rename to flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java index 8167292..6d1c414 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java @@ -18,6 +18,7 @@ package org.apache.flink.kubernetes.operator.reconciler.sessionjob; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.TestingFlinkService; @@ -29,14 +30,18 @@ import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.crd.status.JobStatus; import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; +import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.kubernetes.operator.utils.SavepointUtils; +import org.apache.flink.runtime.client.JobStatusMessage; import io.javaoperatorsdk.operator.api.reconciler.Context; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import javax.annotation.Nullable; +import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; @@ -45,132 +50,132 @@ import static org.junit.Assert.assertNull; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; -/** Tests for {@link FlinkSessionJobReconciler}. */ -public class FlinkSessionJobReconcilerTest { +/** Tests for {@link SessionJobReconciler}. */ +public class SessionJobReconcilerTest { private final FlinkConfigManager configManager = new FlinkConfigManager(new Configuration()); + private TestingFlinkService flinkService = new TestingFlinkService(); + private SessionJobReconciler reconciler; + + @BeforeEach + public void before() { + var eventRecorder = new EventRecorder(null, (r, e) -> {}); + reconciler = new SessionJobReconciler(null, flinkService, configManager, eventRecorder); + } @Test public void testSubmitAndCleanUp() throws Exception { - TestingFlinkService flinkService = new TestingFlinkService(); FlinkSessionJob sessionJob = TestUtils.buildSessionJob(); - FlinkSessionJobReconciler reconciler = - new FlinkSessionJobReconciler(null, flinkService, configManager); // session not found reconciler.reconcile(sessionJob, TestUtils.createEmptyContext()); - assertEquals(0, flinkService.listSessionJobs().size()); + assertEquals(0, flinkService.listJobs().size()); // session not ready reconciler.reconcile(sessionJob, TestUtils.createContextWithNotReadyFlinkDeployment()); - assertEquals(0, flinkService.listSessionJobs().size()); + assertEquals(0, flinkService.listJobs().size()); // session ready reconciler.reconcile(sessionJob, TestUtils.createContextWithReadyFlinkDeployment()); - assertEquals(1, flinkService.listSessionJobs().size()); + assertEquals(1, flinkService.listJobs().size()); verifyAndSetRunningJobsToStatus( sessionJob, JobState.RUNNING, org.apache.flink.api.common.JobStatus.RECONCILING.name(), null, - flinkService.listSessionJobs()); + flinkService.listJobs()); // clean up reconciler.cleanup(sessionJob, TestUtils.createContextWithReadyFlinkDeployment()); - assertEquals(0, flinkService.listSessionJobs().size()); + assertEquals( + org.apache.flink.api.common.JobStatus.FINISHED, + flinkService.listJobs().get(0).f1.getJobState()); } @Test public void testRestart() throws Exception { - TestingFlinkService flinkService = new TestingFlinkService(); FlinkSessionJob sessionJob = TestUtils.buildSessionJob(); - FlinkSessionJobReconciler reconciler = - new FlinkSessionJobReconciler(null, flinkService, configManager); // session ready reconciler.reconcile(sessionJob, TestUtils.createContextWithReadyFlinkDeployment()); - assertEquals(1, flinkService.listSessionJobs().size()); + assertEquals(1, flinkService.listJobs().size()); verifyAndSetRunningJobsToStatus( sessionJob, JobState.RUNNING, org.apache.flink.api.common.JobStatus.RECONCILING.name(), null, - flinkService.listSessionJobs()); + flinkService.listJobs()); sessionJob.getSpec().setRestartNonce(2L); reconciler.reconcile(sessionJob, TestUtils.createContextWithReadyFlinkDeployment()); - assertEquals(0, flinkService.listSessionJobs().size()); + assertEquals( + org.apache.flink.api.common.JobStatus.FINISHED, + flinkService.listJobs().get(0).f1.getJobState()); reconciler.reconcile(sessionJob, TestUtils.createContextWithReadyFlinkDeployment()); verifyAndSetRunningJobsToStatus( sessionJob, JobState.RUNNING, org.apache.flink.api.common.JobStatus.RECONCILING.name(), null, - flinkService.listSessionJobs()); + flinkService.listJobs()); } @Test public void testSubmitWithInitialSavepointPath() throws Exception { - TestingFlinkService flinkService = new TestingFlinkService(); FlinkSessionJob sessionJob = TestUtils.buildSessionJob(); var initSavepointPath = "file:///init-sp"; sessionJob.getSpec().getJob().setInitialSavepointPath(initSavepointPath); - FlinkSessionJobReconciler reconciler = - new FlinkSessionJobReconciler(null, flinkService, configManager); reconciler.reconcile(sessionJob, TestUtils.createContextWithReadyFlinkDeployment()); verifyAndSetRunningJobsToStatus( sessionJob, JobState.RUNNING, org.apache.flink.api.common.JobStatus.RECONCILING.name(), initSavepointPath, - flinkService.listSessionJobs()); + flinkService.listJobs()); } @Test public void testStatelessUpgrade() throws Exception { - TestingFlinkService flinkService = new TestingFlinkService(); FlinkSessionJob sessionJob = TestUtils.buildSessionJob(); var readyContext = TestUtils.createContextWithReadyFlinkDeployment(); - FlinkSessionJobReconciler reconciler = - new FlinkSessionJobReconciler(null, flinkService, configManager); reconciler.reconcile(sessionJob, readyContext); - assertEquals(1, flinkService.listSessionJobs().size()); + assertEquals(1, flinkService.listJobs().size()); verifyAndSetRunningJobsToStatus( sessionJob, JobState.RUNNING, org.apache.flink.api.common.JobStatus.RECONCILING.name(), null, - flinkService.listSessionJobs()); + flinkService.listJobs()); var statelessSessionJob = ReconciliationUtils.clone(sessionJob); statelessSessionJob.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS); statelessSessionJob.getSpec().getJob().setParallelism(2); // job suspended first reconciler.reconcile(statelessSessionJob, readyContext); - assertTrue(flinkService.listSessionJobs().isEmpty()); - verifyJobState(statelessSessionJob, JobState.SUSPENDED, JobState.SUSPENDED.name()); + assertEquals( + org.apache.flink.api.common.JobStatus.FINISHED, + flinkService.listJobs().get(0).f1.getJobState()); + verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED"); + flinkService.clear(); reconciler.reconcile(statelessSessionJob, readyContext); - assertEquals(1, flinkService.listSessionJobs().size()); + assertEquals(1, flinkService.listJobs().size()); verifyAndSetRunningJobsToStatus( statelessSessionJob, JobState.RUNNING, org.apache.flink.api.common.JobStatus.RECONCILING.name(), null, - flinkService.listSessionJobs()); + flinkService.listJobs()); } @Test public void testSavepointUpgrade() throws Exception { - TestingFlinkService flinkService = new TestingFlinkService(); FlinkSessionJob sessionJob = TestUtils.buildSessionJob(); - FlinkSessionJobReconciler reconciler = - new FlinkSessionJobReconciler(null, flinkService, configManager); var readyContext = TestUtils.createContextWithReadyFlinkDeployment(); reconciler.reconcile(sessionJob, readyContext); // start the job - assertEquals(1, flinkService.listSessionJobs().size()); + assertEquals(1, flinkService.listJobs().size()); assertTrue( sessionJob .getStatus() @@ -183,11 +188,21 @@ public class FlinkSessionJobReconcilerTest { var statefulSessionJob = ReconciliationUtils.clone(sessionJob); statefulSessionJob.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT); statefulSessionJob.getSpec().getJob().setParallelism(3); + + verifyAndSetRunningJobsToStatus( + statefulSessionJob, + JobState.RUNNING, + org.apache.flink.api.common.JobStatus.RECONCILING.name(), + null, + flinkService.listJobs()); + reconciler.reconcile(statefulSessionJob, readyContext); // job suspended first - assertTrue(flinkService.listSessionJobs().isEmpty()); - verifyJobState(statefulSessionJob, JobState.SUSPENDED, JobState.SUSPENDED.name()); + assertEquals( + org.apache.flink.api.common.JobStatus.FINISHED, + flinkService.listJobs().get(0).f1.getJobState()); + verifyJobState(statefulSessionJob, JobState.SUSPENDED, "FINISHED"); assertEquals( 1, statefulSessionJob @@ -205,52 +220,31 @@ public class FlinkSessionJobReconcilerTest { .getLastSavepoint() .getTriggerType()); + flinkService.clear(); // upgraded reconciler.reconcile(statefulSessionJob, readyContext); - assertEquals(1, flinkService.listSessionJobs().size()); + assertEquals(1, flinkService.listJobs().size()); verifyAndSetRunningJobsToStatus( statefulSessionJob, JobState.RUNNING, org.apache.flink.api.common.JobStatus.RECONCILING.name(), "savepoint_0", - flinkService.listSessionJobs()); - } - - @Test - public void testUseTheEffectiveConfigToSubmit() throws Exception { - TestingFlinkService flinkService = new TestingFlinkService(); - FlinkSessionJob sessionJob = TestUtils.buildSessionJob(); - - var readyContext = - TestUtils.createContextWithReadyFlinkDeployment(Map.of("key", "newValue")); - - FlinkSessionJobReconciler reconciler = - new FlinkSessionJobReconciler(null, flinkService, configManager); - - reconciler.reconcile(sessionJob, readyContext); - - assertEquals(1, flinkService.listSessionJobs().size()); - var submittedJob = - verifyAndReturnTheSubmittedJob(sessionJob, flinkService.listSessionJobs()); - assertEquals("newValue", submittedJob.effectiveConfig.getString("key", null)); + flinkService.listJobs()); } @Test public void testTriggerSavepoint() throws Exception { - TestingFlinkService flinkService = new TestingFlinkService(); FlinkSessionJob sessionJob = TestUtils.buildSessionJob(); assertFalse(SavepointUtils.savepointInProgress(sessionJob.getStatus().getJobStatus())); var readyContext = TestUtils.createContextWithReadyFlinkDeployment(); - FlinkSessionJobReconciler reconciler = - new FlinkSessionJobReconciler(null, flinkService, configManager); reconciler.reconcile(sessionJob, readyContext); verifyAndSetRunningJobsToStatus( sessionJob, JobState.RUNNING, org.apache.flink.api.common.JobStatus.RECONCILING.name(), null, - flinkService.listSessionJobs()); + flinkService.listJobs()); assertFalse(SavepointUtils.savepointInProgress(sessionJob.getStatus().getJobStatus())); @@ -344,7 +338,7 @@ public class FlinkSessionJobReconcilerTest { JobState.RUNNING, org.apache.flink.api.common.JobStatus.RECONCILING.name(), null, - flinkService.listSessionJobs()); + flinkService.listJobs()); sp1SessionJob.getStatus().getJobStatus().getSavepointInfo().resetTrigger(); ReconciliationUtils.updateLastReconciledSavepointTriggerNonce( @@ -367,11 +361,11 @@ public class FlinkSessionJobReconcilerTest { assertFalse(SavepointUtils.savepointInProgress(sp1SessionJob.getStatus().getJobStatus())); } - private TestingFlinkService.SubmittedJobInfo verifyAndReturnTheSubmittedJob( - FlinkSessionJob sessionJob, - Map<JobID, TestingFlinkService.SubmittedJobInfo> sessionJobs) { + private Tuple2<String, JobStatusMessage> verifyAndReturnTheSubmittedJob( + FlinkSessionJob sessionJob, List<Tuple2<String, JobStatusMessage>> jobs) { var jobID = JobID.fromHexString(sessionJob.getStatus().getJobStatus().getJobId()); - var submittedJobInfo = sessionJobs.get(jobID); + var submittedJobInfo = + jobs.stream().filter(t -> t.f1.getJobId().equals(jobID)).findAny().get(); Assertions.assertNotNull(submittedJobInfo); return submittedJobInfo; } @@ -381,14 +375,14 @@ public class FlinkSessionJobReconcilerTest { JobState expectedState, String jobStatusObserved, @Nullable String expectedSavepointPath, - Map<JobID, TestingFlinkService.SubmittedJobInfo> sessionJobs) { + List<Tuple2<String, JobStatusMessage>> jobs) { - var submittedJobInfo = verifyAndReturnTheSubmittedJob(sessionJob, sessionJobs); - assertEquals(expectedSavepointPath, submittedJobInfo.savepointPath); + var submittedJobInfo = verifyAndReturnTheSubmittedJob(sessionJob, jobs); + assertEquals(expectedSavepointPath, submittedJobInfo.f0); verifyJobState(sessionJob, expectedState, jobStatusObserved); JobStatus jobStatus = sessionJob.getStatus().getJobStatus(); - jobStatus.setJobName(submittedJobInfo.jobStatusMessage.getJobName()); + jobStatus.setJobName(submittedJobInfo.f1.getJobName()); jobStatus.setState("RUNNING"); } @@ -409,9 +403,6 @@ public class FlinkSessionJobReconcilerTest { @Test public void testJobUpgradeIgnorePendingSavepoint() throws Exception { Context readyContext = TestUtils.createContextWithReadyFlinkDeployment(); - TestingFlinkService flinkService = new TestingFlinkService(); - FlinkSessionJobReconciler reconciler = - new FlinkSessionJobReconciler(null, flinkService, configManager); FlinkSessionJob sessionJob = TestUtils.buildSessionJob(); reconciler.reconcile(sessionJob, readyContext); verifyAndSetRunningJobsToStatus( @@ -419,7 +410,7 @@ public class FlinkSessionJobReconcilerTest { JobState.RUNNING, org.apache.flink.api.common.JobStatus.RECONCILING.name(), null, - flinkService.listSessionJobs()); + flinkService.listJobs()); FlinkSessionJob spSessionJob = ReconciliationUtils.clone(sessionJob); spSessionJob @@ -439,12 +430,14 @@ public class FlinkSessionJobReconcilerTest { .key(), "true"))); // Force upgrade when savepoint is in progress. - reconciler = new FlinkSessionJobReconciler(null, flinkService, configManager); + reconciler = + new SessionJobReconciler( + null, flinkService, configManager, new EventRecorder(null, null)); spSessionJob.getSpec().getJob().setParallelism(100); reconciler.reconcile(spSessionJob, readyContext); assertEquals( "trigger_0", spSessionJob.getStatus().getJobStatus().getSavepointInfo().getTriggerId()); - assertEquals(JobState.SUSPENDED.name(), spSessionJob.getStatus().getJobStatus().getState()); + assertEquals("FINISHED", spSessionJob.getStatus().getJobStatus().getState()); } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java index f5afb2b..4b515aa 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java @@ -68,6 +68,7 @@ import static org.junit.jupiter.api.Assertions.fail; public class FlinkServiceTest { KubernetesClient client; private final Configuration configuration = new Configuration(); + private final FlinkConfigManager configManager = new FlinkConfigManager(configuration); @BeforeEach public void setup() { @@ -98,7 +99,8 @@ public class FlinkServiceTest { deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY); deployment.getStatus().getJobStatus().setState("RUNNING"); - flinkService.cancelJob(deployment, UpgradeMode.STATELESS); + flinkService.cancelJob( + deployment, UpgradeMode.STATELESS, configManager.getObserveConfig(deployment)); assertTrue(cancelFuture.isDone()); assertEquals(jobID, cancelFuture.get()); assertNull(jobStatus.getSavepointInfo().getLastSavepoint()); @@ -134,7 +136,8 @@ public class FlinkServiceTest { ReconciliationUtils.updateForSpecReconciliationSuccess( deployment, JobState.RUNNING, new Configuration()); - flinkService.cancelJob(deployment, UpgradeMode.SAVEPOINT); + flinkService.cancelJob( + deployment, UpgradeMode.SAVEPOINT, configManager.getObserveConfig(deployment)); assertTrue(stopWithSavepointFuture.isDone()); assertEquals(jobID, stopWithSavepointFuture.get().f0); assertFalse(stopWithSavepointFuture.get().f1); @@ -166,7 +169,8 @@ public class FlinkServiceTest { JobStatus jobStatus = deployment.getStatus().getJobStatus(); jobStatus.setJobId(jobID.toHexString()); - flinkService.cancelJob(deployment, UpgradeMode.LAST_STATE); + flinkService.cancelJob( + deployment, UpgradeMode.LAST_STATE, configManager.getObserveConfig(deployment)); assertNull(jobStatus.getSavepointInfo().getLastSavepoint()); assertNull( client.apps()
