This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 3a49555e49df780c4efa8ecd4b679c2895766e29
Author: Gyula Fora <[email protected]>
AuthorDate: Tue Jun 21 14:43:58 2022 +0200

    [FLINK-28180] Unify Application and SessionJob reconciler logic
---
 .../flink/kubernetes/operator/FlinkOperator.java   |   5 +-
 .../operator/artifact/ArtifactManager.java         |  12 +-
 .../operator/config/FlinkConfigManager.java        |   8 +-
 .../observer/sessionjob/SessionJobObserver.java    |   9 +-
 .../operator/reconciler/ReconciliationUtils.java   |  76 +----
 .../deployment/AbstractDeploymentReconciler.java   |  95 ------
 .../AbstractFlinkResourceReconciler.java           | 355 +++++++++++++++++++++
 .../deployment/AbstractJobReconciler.java          | 211 ++++++++++++
 .../deployment/ApplicationReconciler.java          | 253 ++++-----------
 .../reconciler/deployment/SessionReconciler.java   | 122 +++----
 .../sessionjob/FlinkSessionJobReconciler.java      | 211 ------------
 .../reconciler/sessionjob/SessionJobHelper.java    |  61 ----
 .../sessionjob/SessionJobReconciler.java           | 147 +++++++++
 .../kubernetes/operator/service/FlinkService.java  |  57 ++--
 .../operator/validation/DefaultValidator.java      |   2 +-
 .../kubernetes/operator/TestingFlinkService.java   |  47 +--
 .../operator/artifact/ArtifactManagerTest.java     |   4 +-
 .../sessionjob/SessionJobObserverTest.java         |   8 +-
 .../deployment/ApplicationReconcilerTest.java      |   8 +-
 ...ilerTest.java => SessionJobReconcilerTest.java} | 145 ++++-----
 .../operator/service/FlinkServiceTest.java         |  10 +-
 21 files changed, 1001 insertions(+), 845 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 5e93866..1b0daea 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -33,7 +33,7 @@ import 
org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
 import 
org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
 import 
org.apache.flink.kubernetes.operator.observer.sessionjob.SessionJobObserver;
 import 
org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
-import 
org.apache.flink.kubernetes.operator.reconciler.sessionjob.FlinkSessionJobReconciler;
+import 
org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.EnvUtils;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
@@ -132,8 +132,9 @@ public class FlinkOperator {
     }
 
     private void registerSessionJobController() {
-        var reconciler = new FlinkSessionJobReconciler(client, flinkService, 
configManager);
         var eventRecorder = EventRecorder.create(client, listeners);
+        var reconciler =
+                new SessionJobReconciler(client, flinkService, configManager, 
eventRecorder);
         var statusRecorder = 
StatusRecorder.<FlinkSessionJobStatus>create(client, listeners);
         var observer =
                 new SessionJobObserver(flinkService, configManager, 
statusRecorder, eventRecorder);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/ArtifactManager.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/ArtifactManager.java
index fa9cf98..ec370de 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/ArtifactManager.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/ArtifactManager.java
@@ -19,9 +19,10 @@ package org.apache.flink.kubernetes.operator.artifact;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
-import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
 import org.apache.flink.util.FlinkRuntimeException;
 
+import io.fabric8.kubernetes.api.model.ObjectMeta;
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,15 +65,14 @@ public class ArtifactManager {
         }
     }
 
-    public String generateJarDir(FlinkSessionJob sessionJob) {
+    public String generateJarDir(ObjectMeta meta, FlinkSessionJobSpec spec) {
         return String.join(
                 File.separator,
                 new String[] {
                     new 
File(configManager.getOperatorConfiguration().getArtifactsBaseDir())
-                            .getAbsolutePath(),
-                    sessionJob.getMetadata().getNamespace(),
-                    sessionJob.getSpec().getDeploymentName(),
-                    sessionJob.getMetadata().getName()
+                                    .getAbsolutePath(),
+                            meta.getNamespace(),
+                    spec.getDeploymentName(), meta.getName()
                 });
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
index 0895f29..e7fbbfc 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
@@ -22,8 +22,8 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 
@@ -43,7 +43,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Executors;
@@ -161,12 +160,11 @@ public class FlinkConfigManager {
     }
 
     public Configuration getSessionJobConfig(
-            FlinkDeployment deployment, FlinkSessionJob flinkSessionJob) {
+            FlinkDeployment deployment, FlinkSessionJobSpec sessionJobSpec) {
         Configuration sessionJobConfig = getObserveConfig(deployment);
 
         // merge session job specific config
-        Map<String, String> sessionJobFlinkConfiguration =
-                flinkSessionJob.getSpec().getFlinkConfiguration();
+        var sessionJobFlinkConfiguration = 
sessionJobSpec.getFlinkConfiguration();
         if (sessionJobFlinkConfiguration != null) {
             sessionJobFlinkConfiguration.forEach(sessionJobConfig::setString);
         }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
index 6169a4d..78922f1 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
@@ -26,7 +26,7 @@ import 
org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
 import org.apache.flink.kubernetes.operator.observer.Observer;
 import org.apache.flink.kubernetes.operator.observer.SavepointObserver;
 import 
org.apache.flink.kubernetes.operator.observer.context.VoidObserverContext;
-import 
org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobHelper;
+import 
org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
@@ -103,13 +103,12 @@ public class SessionJobObserver implements 
Observer<FlinkSessionJob> {
 
         Optional<FlinkDeployment> flinkDepOpt = 
context.getSecondaryResource(FlinkDeployment.class);
 
-        var helper = new SessionJobHelper(flinkSessionJob, LOG);
-
-        if (!helper.sessionClusterReady(flinkDepOpt)) {
+        if (!SessionJobReconciler.sessionClusterReady(flinkDepOpt)) {
             return;
         }
 
-        var deployedConfig = 
configManager.getSessionJobConfig(flinkDepOpt.get(), flinkSessionJob);
+        var deployedConfig =
+                configManager.getSessionJobConfig(flinkDepOpt.get(), 
flinkSessionJob.getSpec());
         var jobFound =
                 jobStatusObserver.observe(
                         flinkSessionJob, deployedConfig, 
VoidObserverContext.INSTANCE);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index 63d0f15..ddbc79f 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -19,18 +19,14 @@ package org.apache.flink.kubernetes.operator.reconciler;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
-import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
 import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
-import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
-import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
@@ -38,7 +34,6 @@ import 
org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
 import org.apache.flink.kubernetes.operator.crd.status.TaskManagerInfo;
 import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
 import org.apache.flink.kubernetes.operator.metrics.MetricManager;
-import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 import org.apache.flink.util.Preconditions;
@@ -55,8 +50,6 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
-import java.time.Duration;
-import java.time.Instant;
 import java.util.Optional;
 
 /** Reconciliation utilities. */
@@ -197,18 +190,19 @@ public class ReconciliationUtils {
     }
 
     public static boolean 
isUpgradeModeChangedToLastStateAndHADisabledPreviously(
-            FlinkDeployment flinkApp, FlinkConfigManager configManager) {
+            AbstractFlinkResource<?, ?> flinkApp, Configuration observeConfig) 
{
 
-        FlinkDeploymentSpec deployedSpec = getDeployedSpec(flinkApp);
+        var deployedSpec = getDeployedSpec(flinkApp);
         UpgradeMode previousUpgradeMode = 
deployedSpec.getJob().getUpgradeMode();
         UpgradeMode currentUpgradeMode = 
flinkApp.getSpec().getJob().getUpgradeMode();
 
         return previousUpgradeMode != UpgradeMode.LAST_STATE
                 && currentUpgradeMode == UpgradeMode.LAST_STATE
-                && 
!FlinkUtils.isKubernetesHAActivated(configManager.getObserveConfig(flinkApp));
+                && !FlinkUtils.isKubernetesHAActivated(observeConfig);
     }
 
-    public static FlinkDeploymentSpec getDeployedSpec(FlinkDeployment 
deployment) {
+    public static <SPEC extends AbstractFlinkSpec> SPEC getDeployedSpec(
+            AbstractFlinkResource<SPEC, ?> deployment) {
         var reconciliationStatus = 
deployment.getStatus().getReconciliationStatus();
         var reconciliationState = reconciliationStatus.getState();
         if (reconciliationState != ReconciliationState.ROLLED_BACK) {
@@ -267,66 +261,6 @@ public class ReconciliationUtils {
         }
     }
 
-    public static boolean shouldRollBack(
-            FlinkService flinkService,
-            ReconciliationStatus<FlinkDeploymentSpec> reconciliationStatus,
-            Configuration configuration) {
-
-        if (reconciliationStatus.getState() == 
ReconciliationState.ROLLING_BACK) {
-            return true;
-        }
-
-        if 
(!configuration.get(KubernetesOperatorConfigOptions.DEPLOYMENT_ROLLBACK_ENABLED)
-                || reconciliationStatus.getState() == 
ReconciliationState.ROLLED_BACK
-                || reconciliationStatus.isLastReconciledSpecStable()) {
-            return false;
-        }
-
-        var lastStableSpec = reconciliationStatus.deserializeLastStableSpec();
-        if (lastStableSpec != null
-                && lastStableSpec.getJob() != null
-                && lastStableSpec.getJob().getState() == JobState.SUSPENDED) {
-            // Should not roll back to suspended state
-            return false;
-        }
-
-        Duration readinessTimeout =
-                
configuration.get(KubernetesOperatorConfigOptions.DEPLOYMENT_READINESS_TIMEOUT);
-        if (!Instant.now()
-                .minus(readinessTimeout)
-                
.isAfter(Instant.ofEpochMilli(reconciliationStatus.getReconciliationTimestamp())))
 {
-            return false;
-        }
-
-        var haDataAvailable = 
flinkService.isHaMetadataAvailable(configuration);
-        if (!haDataAvailable) {
-            LOG.warn("Rollback is not possible due to missing HA metadata");
-        }
-        return haDataAvailable;
-    }
-
-    public static boolean shouldRecoverDeployment(Configuration conf, 
FlinkDeployment deployment) {
-
-        if (!ReconciliationUtils.jmMissingForRunningDeployment(deployment)
-                || !conf.get(
-                        
KubernetesOperatorConfigOptions.OPERATOR_JM_DEPLOYMENT_RECOVERY_ENABLED)) {
-            return false;
-        }
-
-        if (!FlinkUtils.isKubernetesHAActivated(conf)) {
-            LOG.warn("Could not recover lost deployment without HA enabled");
-            return false;
-        }
-        return true;
-    }
-
-    private static boolean jmMissingForRunningDeployment(FlinkDeployment 
deployment) {
-        var deployedJob = getDeployedSpec(deployment).getJob();
-        return (deployedJob == null || deployedJob.getState() == 
JobState.RUNNING)
-                && (deployment.getStatus().getJobManagerDeploymentStatus()
-                        == JobManagerDeploymentStatus.MISSING);
-    }
-
     public static boolean isJobInTerminalState(CommonStatus<?> status) {
         var jobState = status.getJobStatus().getState();
         return 
org.apache.flink.api.common.JobStatus.valueOf(jobState).isGloballyTerminalState();
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractDeploymentReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractDeploymentReconciler.java
deleted file mode 100644
index 16d7b44..0000000
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractDeploymentReconciler.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.kubernetes.operator.reconciler.deployment;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
-import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
-import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
-import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
-import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
-import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
-import org.apache.flink.kubernetes.operator.service.FlinkService;
-import org.apache.flink.kubernetes.operator.utils.EventRecorder;
-
-import io.fabric8.kubernetes.client.KubernetesClient;
-import io.javaoperatorsdk.operator.api.reconciler.Context;
-import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** BaseReconciler with functionality that is common to job and session modes. 
*/
-public abstract class AbstractDeploymentReconciler implements 
Reconciler<FlinkDeployment> {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractDeploymentReconciler.class);
-
-    protected final FlinkConfigManager configManager;
-    protected final EventRecorder eventRecorder;
-    protected final KubernetesClient kubernetesClient;
-    protected final FlinkService flinkService;
-
-    public AbstractDeploymentReconciler(
-            KubernetesClient kubernetesClient,
-            FlinkService flinkService,
-            FlinkConfigManager configManager,
-            EventRecorder eventRecorder) {
-        this.kubernetesClient = kubernetesClient;
-        this.flinkService = flinkService;
-        this.configManager = configManager;
-        this.eventRecorder = eventRecorder;
-    }
-
-    @Override
-    public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
-        shutdown(flinkApp);
-        return DeleteControl.defaultDelete();
-    }
-
-    protected boolean initiateRollBack(FlinkDeploymentStatus status) {
-        ReconciliationStatus<?> reconciliationStatus = 
status.getReconciliationStatus();
-        if (reconciliationStatus.getState() != 
ReconciliationState.ROLLING_BACK) {
-            LOG.warn("Preparing to roll back to last stable spec.");
-            if (StringUtils.isEmpty(status.getError())) {
-                status.setError(
-                        "Deployment is not ready within the configured 
timeout, rolling back.");
-            }
-            reconciliationStatus.setState(ReconciliationState.ROLLING_BACK);
-            return true;
-        }
-        return false;
-    }
-
-    protected boolean newSpecIsAlreadyDeployed(FlinkDeployment flinkApp, 
Configuration deployConf) {
-        FlinkDeploymentSpec deployedSpec = 
ReconciliationUtils.getDeployedSpec(flinkApp);
-        if (flinkApp.getSpec().equals(deployedSpec)) {
-            LOG.info(
-                    "The new spec matches the currently deployed last stable 
spec. No upgrade needed.");
-            ReconciliationUtils.updateForSpecReconciliationSuccess(
-                    flinkApp,
-                    deployedSpec.getJob() != null ? 
deployedSpec.getJob().getState() : null,
-                    deployConf);
-            return true;
-        }
-        return false;
-    }
-
-    protected abstract void shutdown(FlinkDeployment flinkApp);
-}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
new file mode 100644
index 0000000..2e0d592
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
+import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
+
+/**
+ * Base class for all Flink resource reconcilers. It contains the general flow 
of reconciling Flink
+ * related resources including initial deployments, upgrades, rollbacks etc.
+ */
+public abstract class AbstractFlinkResourceReconciler<
+                CR extends AbstractFlinkResource<SPEC, STATUS>,
+                SPEC extends AbstractFlinkSpec,
+                STATUS extends CommonStatus<SPEC>>
+        implements Reconciler<CR> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AbstractFlinkResourceReconciler.class);
+
+    protected final FlinkConfigManager configManager;
+    protected final EventRecorder eventRecorder;
+    protected final KubernetesClient kubernetesClient;
+    protected final FlinkService flinkService;
+
+    public AbstractFlinkResourceReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkConfigManager configManager,
+            EventRecorder eventRecorder) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.configManager = configManager;
+        this.eventRecorder = eventRecorder;
+    }
+
+    @Override
+    public final void reconcile(CR cr, Context ctx) throws Exception {
+        var spec = cr.getSpec();
+        var deployConfig = getDeployConfig(cr.getMetadata(), spec, ctx);
+        var status = cr.getStatus();
+
+        // If the resource is not ready for reconciliation we simply return
+        if (!readyToReconcile(cr, ctx, deployConfig)) {
+            LOG.info("Not ready for reconciliation yet...");
+            return;
+        }
+
+        var firstDeployment = 
status.getReconciliationStatus().getLastReconciledSpec() == null;
+
+        // If this is the first deployment for the resource we simply submit 
the job and return.
+        // No further logic is required at this point.
+        if (firstDeployment) {
+            LOG.info("Deploying for the first time");
+            deploy(
+                    cr.getMetadata(),
+                    spec,
+                    status,
+                    deployConfig,
+                    
Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath),
+                    false);
+            ReconciliationUtils.updateForSpecReconciliationSuccess(
+                    cr, JobState.RUNNING, deployConfig);
+            return;
+        }
+
+        var reconciliationStatus = cr.getStatus().getReconciliationStatus();
+        SPEC lastReconciledSpec =
+                
cr.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+        SPEC currentDeploySpec = cr.getSpec();
+
+        boolean specChanged = !currentDeploySpec.equals(lastReconciledSpec);
+        var observeConfig = getObserveConfig(cr, ctx);
+
+        if (specChanged) {
+            if (checkNewSpecAlreadyDeployed(cr, deployConfig)) {
+                return;
+            }
+            LOG.info("Reconciling spec change");
+            reconcileSpecChange(cr, observeConfig, deployConfig);
+        } else if (shouldRollBack(reconciliationStatus, observeConfig)) {
+            // Rollbacks are executed in two steps, we initiate it first then 
return
+            if (initiateRollBack(status)) {
+                return;
+            }
+            LOG.warn("Executing rollback operation");
+            rollback(cr, ctx, observeConfig);
+        } else if (!reconcileOtherChanges(cr, observeConfig)) {
+            LOG.info("Resource fully reconciled, nothing to do...");
+        }
+    }
+
+    /**
+     * Get Flink configuration object for deploying the given spec using 
{@link #deploy}.
+     *
+     * @param meta ObjectMeta of the related resource.
+     * @param spec Spec for which the config should be created.
+     * @param ctx Reconciliation context.
+     * @return Deployment configuration.
+     */
+    protected abstract Configuration getDeployConfig(ObjectMeta meta, SPEC 
spec, Context ctx);
+
+    /**
+     * Get Flink configuration for client interactions with the running Flink 
deployment/session
+     * job.
+     *
+     * @param resource Related Flink resource.
+     * @param context Reconciliation context.
+     * @return Observe configuration.
+     */
+    protected abstract Configuration getObserveConfig(CR resource, Context 
context);
+
+    /**
+     * Check whether the given Flink resource is ready to be reconciled or we 
are still waiting for
+     * any pending operation or condition first.
+     *
+     * @param cr Related Flink resource.
+     * @param ctx Reconciliation context.
+     * @param deployConfig Deployment configuration.
+     * @return True if the resource is ready to be reconciled.
+     */
+    protected abstract boolean readyToReconcile(CR cr, Context ctx, 
Configuration deployConfig);
+
+    /**
+     * Reconcile spec upgrade on the currently deployed/suspended Flink 
resource and update the
+     * status accordingly.
+     *
+     * @param cr Related Flink resource.
+     * @param observeConfig Observe configuration.
+     * @param deployConfig Deployment configuration.
+     * @throws Exception
+     */
+    protected abstract void reconcileSpecChange(
+            CR cr, Configuration observeConfig, Configuration deployConfig) 
throws Exception;
+
+    /**
+     * Rollback deployed resource to the last stable spec.
+     *
+     * @param cr Related Flink resource.
+     * @param ctx Reconciliation context.
+     * @param observeConfig Observe configuration.
+     * @throws Exception
+     */
+    protected abstract void rollback(CR cr, Context ctx, Configuration 
observeConfig)
+            throws Exception;
+
+    /**
+     * Reconcile any other changes required for this resource that are 
specific to the reconciler
+     * implementation.
+     *
+     * @param cr Related Flink resource.
+     * @param observeConfig Observe configuration.
+     * @return True if any further reconciliation action was taken.
+     * @throws Exception
+     */
+    protected abstract boolean reconcileOtherChanges(CR cr, Configuration 
observeConfig)
+            throws Exception;
+
+    @Override
+    public final DeleteControl cleanup(CR resource, Context context) {
+        return cleanupInternal(resource, context);
+    }
+
+    /**
+     * Deploys the target resource spec to Kubernetes.
+     *
+     * @param meta ObjectMeta of the related resource.
+     * @param spec Spec that should be deployed to Kubernetes.
+     * @param status Status object of the resource
+     * @param deployConfig Flink conf for the deployment.
+     * @param savepoint Optional savepoint path for applications and session 
jobs.
+     * @param requireHaMetadata Flag used by application deployments to 
validate HA metadata
+     * @throws Exception
+     */
+    protected abstract void deploy(
+            ObjectMeta meta,
+            SPEC spec,
+            STATUS status,
+            Configuration deployConfig,
+            Optional<String> savepoint,
+            boolean requireHaMetadata)
+            throws Exception;
+
+    /**
+     * Shut down and clean up all Flink job/cluster resources.
+     *
+     * @param resource Resource being reconciled.
+     * @param context Current context.
+     * @return DeleteControl object.
+     */
+    protected abstract DeleteControl cleanupInternal(CR resource, Context 
context);
+
+    /**
+     * Checks whether the desired spec already matches the currently deployed 
spec. If they match
+     * the resource status is updated to reflect successful reconciliation.
+     *
+     * @param resource Resource being reconciled.
+     * @param deployConf Deploy configuration for the Flink resource.
+     * @return True if desired spec was already deployed.
+     */
+    private boolean checkNewSpecAlreadyDeployed(CR resource, Configuration 
deployConf) {
+        AbstractFlinkSpec deployedSpec = 
ReconciliationUtils.getDeployedSpec(resource);
+        if (resource.getSpec().equals(deployedSpec)) {
+            LOG.info(
+                    "The new spec matches the currently deployed last stable 
spec. No upgrade needed.");
+            ReconciliationUtils.updateForSpecReconciliationSuccess(
+                    resource,
+                    deployedSpec.getJob() != null ? 
deployedSpec.getJob().getState() : null,
+                    deployConf);
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Checks whether the currently deployed Flink resource spec should be 
rolled back to the stable
+     * spec. This includes validating the current deployment status, config 
and checking if the last
+     * reconciled spec did not become stable within the configured grace 
period.
+     *
+     * <p>Rollbacks are only supported to previously running resource specs 
with HA enabled.
+     *
+     * @param reconciliationStatus ReconciliationStatus of the resource.
+     * @param configuration Flink cluster configuration.
+     * @return
+     */
+    private boolean shouldRollBack(
+            ReconciliationStatus<SPEC> reconciliationStatus, Configuration 
configuration) {
+
+        if (reconciliationStatus.getState() == 
ReconciliationState.ROLLING_BACK) {
+            return true;
+        }
+
+        if 
(!configuration.get(KubernetesOperatorConfigOptions.DEPLOYMENT_ROLLBACK_ENABLED)
+                || reconciliationStatus.getState() == 
ReconciliationState.ROLLED_BACK
+                || reconciliationStatus.isLastReconciledSpecStable()) {
+            return false;
+        }
+
+        var lastStableSpec = reconciliationStatus.deserializeLastStableSpec();
+        if (lastStableSpec != null
+                && lastStableSpec.getJob() != null
+                && lastStableSpec.getJob().getState() == JobState.SUSPENDED) {
+            // Should not roll back to suspended state
+            return false;
+        }
+
+        Duration readinessTimeout =
+                
configuration.get(KubernetesOperatorConfigOptions.DEPLOYMENT_READINESS_TIMEOUT);
+        if (!Instant.now()
+                .minus(readinessTimeout)
+                
.isAfter(Instant.ofEpochMilli(reconciliationStatus.getReconciliationTimestamp())))
 {
+            return false;
+        }
+
+        var haDataAvailable = 
flinkService.isHaMetadataAvailable(configuration);
+        if (!haDataAvailable) {
+            LOG.warn("Rollback is not possible due to missing HA metadata");
+        }
+        return haDataAvailable;
+    }
+
+    /**
+     * Initiate rollback process by changing the {@link ReconciliationState} 
in the status.
+     *
+     * @param status Resource status.
+     * @return True if a new rollback was initiated.
+     */
+    private boolean initiateRollBack(STATUS status) {
+        var reconciliationStatus = status.getReconciliationStatus();
+        if (reconciliationStatus.getState() != 
ReconciliationState.ROLLING_BACK) {
+            LOG.warn("Preparing to roll back to last stable spec.");
+            if (StringUtils.isEmpty(status.getError())) {
+                status.setError(
+                        "Deployment is not ready within the configured 
timeout, rolling back.");
+            }
+            reconciliationStatus.setState(ReconciliationState.ROLLING_BACK);
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Checks whether the JobManager Kubernetes Deployment recovery logic 
should be initiated. This
+     * is triggered only if, jm deployment missing, recovery config and HA 
enabled. This logic is
+     * only used by the Session and Application reconcilers.
+     *
+     * @param conf Flink cluster configuration.
+     * @param deployment FlinkDeployment object.
+     * @return True if recovery should be executed.
+     */
+    protected static boolean shouldRecoverDeployment(
+            Configuration conf, FlinkDeployment deployment) {
+
+        if (!jmMissingForRunningDeployment(deployment)
+                || !conf.get(
+                        
KubernetesOperatorConfigOptions.OPERATOR_JM_DEPLOYMENT_RECOVERY_ENABLED)) {
+            return false;
+        }
+
+        if (!FlinkUtils.isKubernetesHAActivated(conf)) {
+            LOG.warn("Could not recover lost deployment without HA enabled");
+            return false;
+        }
+        return true;
+    }
+
+    private static boolean jmMissingForRunningDeployment(FlinkDeployment 
deployment) {
+        var deployedJob = 
ReconciliationUtils.getDeployedSpec(deployment).getJob();
+        return (deployedJob == null || deployedJob.getState() == 
JobState.RUNNING)
+                && (deployment.getStatus().getJobManagerDeploymentStatus()
+                        == JobManagerDeploymentStatus.MISSING);
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
new file mode 100644
index 0000000..3ca8735
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/**
+ * Reconciler responsible for handling the job lifecycle according to the 
desired and current
+ * states.
+ */
+public abstract class AbstractJobReconciler<
+                CR extends AbstractFlinkResource<SPEC, STATUS>,
+                SPEC extends AbstractFlinkSpec,
+                STATUS extends CommonStatus<SPEC>>
+        extends AbstractFlinkResourceReconciler<CR, SPEC, STATUS> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractJobReconciler.class);
+
+    public AbstractJobReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkConfigManager configManager,
+            EventRecorder eventRecorder) {
+        super(kubernetesClient, flinkService, configManager, eventRecorder);
+    }
+
+    @Override
+    public boolean readyToReconcile(CR resource, Context context, 
Configuration deployConfig) {
+        if (shouldWaitForPendingSavepoint(
+                resource.getStatus().getJobStatus(),
+                getDeployConfig(resource.getMetadata(), resource.getSpec(), 
context))) {
+            LOG.info("Delaying job reconciliation until pending savepoint is 
completed.");
+            return false;
+        }
+        return true;
+    }
+
+    private boolean shouldWaitForPendingSavepoint(JobStatus jobStatus, 
Configuration conf) {
+        return !conf.getBoolean(
+                        
KubernetesOperatorConfigOptions.JOB_UPGRADE_IGNORE_PENDING_SAVEPOINT)
+                && SavepointUtils.savepointInProgress(jobStatus);
+    }
+
+    @Override
+    protected void reconcileSpecChange(
+            CR resource, Configuration observeConfig, Configuration 
deployConfig) throws Exception {
+        var deployMeta = resource.getMetadata();
+        STATUS status = resource.getStatus();
+        var reconciliationStatus = status.getReconciliationStatus();
+        SPEC lastReconciledSpec = 
reconciliationStatus.deserializeLastReconciledSpec();
+        SPEC currentDeploySpec = resource.getSpec();
+
+        JobState currentJobState = lastReconciledSpec.getJob().getState();
+        JobState desiredJobState = currentDeploySpec.getJob().getState();
+        JobState newState = currentJobState;
+        if (currentJobState == JobState.RUNNING) {
+            if (desiredJobState == JobState.RUNNING) {
+                LOG.info("Upgrading/Restarting running job, suspending 
first...");
+            }
+            Optional<UpgradeMode> availableUpgradeMode =
+                    getAvailableUpgradeMode(resource, deployConfig, 
observeConfig);
+            if (availableUpgradeMode.isEmpty()) {
+                return;
+            }
+            // We must record the upgrade mode used to the status later
+            
currentDeploySpec.getJob().setUpgradeMode(availableUpgradeMode.get());
+            cancelJob(resource, availableUpgradeMode.get(), observeConfig);
+            newState = JobState.SUSPENDED;
+        }
+        if (currentJobState == JobState.SUSPENDED && desiredJobState == 
JobState.RUNNING) {
+            restoreJob(
+                    deployMeta,
+                    currentDeploySpec,
+                    status,
+                    deployConfig,
+                    // We decide to enforce HA based on how job was previously 
suspended
+                    lastReconciledSpec.getJob().getUpgradeMode() == 
UpgradeMode.LAST_STATE);
+            newState = JobState.RUNNING;
+        }
+        ReconciliationUtils.updateForSpecReconciliationSuccess(resource, 
newState, deployConfig);
+    }
+
+    protected Optional<UpgradeMode> getAvailableUpgradeMode(
+            CR resource, Configuration deployConfig, Configuration 
observeConfig) {
+        var status = resource.getStatus();
+        var upgradeMode = resource.getSpec().getJob().getUpgradeMode();
+        var changedToLastStateWithoutHa =
+                
ReconciliationUtils.isUpgradeModeChangedToLastStateAndHADisabledPreviously(
+                        resource, observeConfig);
+
+        if (upgradeMode == UpgradeMode.STATELESS) {
+            LOG.info("Stateless job, ready for upgrade");
+            return Optional.of(upgradeMode);
+        }
+
+        if (ReconciliationUtils.isJobInTerminalState(status)) {
+            LOG.info(
+                    "Job is in terminal state, ready for upgrade from observed 
latest checkpoint/savepoint");
+            return Optional.of(UpgradeMode.SAVEPOINT);
+        }
+
+        if (ReconciliationUtils.isJobRunning(status)) {
+            LOG.info("Job is in running state, ready for upgrade with {}", 
upgradeMode);
+            if (changedToLastStateWithoutHa) {
+                LOG.info(
+                        "Using savepoint upgrade mode when switching to 
last-state without HA previously enabled");
+                return Optional.of(UpgradeMode.SAVEPOINT);
+            } else {
+                return Optional.of(upgradeMode);
+            }
+        }
+
+        return Optional.empty();
+    }
+
+    protected void restoreJob(
+            ObjectMeta meta,
+            SPEC spec,
+            STATUS status,
+            Configuration deployConfig,
+            boolean requireHaMetadata)
+            throws Exception {
+        Optional<String> savepointOpt = Optional.empty();
+
+        if (spec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) {
+            savepointOpt =
+                    
Optional.ofNullable(status.getJobStatus().getSavepointInfo().getLastSavepoint())
+                            .flatMap(s -> 
Optional.ofNullable(s.getLocation()));
+        }
+
+        deploy(meta, spec, status, deployConfig, savepointOpt, 
requireHaMetadata);
+    }
+
+    @Override
+    protected void rollback(CR resource, Context context, Configuration 
observeConfig)
+            throws Exception {
+        var reconciliationStatus = 
resource.getStatus().getReconciliationStatus();
+        var rollbackSpec = reconciliationStatus.deserializeLastStableSpec();
+
+        UpgradeMode upgradeMode = resource.getSpec().getJob().getUpgradeMode();
+
+        cancelJob(
+                resource,
+                upgradeMode == UpgradeMode.STATELESS
+                        ? UpgradeMode.STATELESS
+                        : UpgradeMode.LAST_STATE,
+                observeConfig);
+
+        restoreJob(
+                resource.getMetadata(),
+                rollbackSpec,
+                resource.getStatus(),
+                getDeployConfig(resource.getMetadata(), rollbackSpec, context),
+                upgradeMode != UpgradeMode.STATELESS);
+
+        reconciliationStatus.setState(ReconciliationState.ROLLED_BACK);
+    }
+
+    @Override
+    public boolean reconcileOtherChanges(CR resource, Configuration 
observeConfig)
+            throws Exception {
+        return SavepointUtils.triggerSavepointIfNeeded(flinkService, resource, 
observeConfig);
+    }
+
+    /**
+     * Cancel the job for the given resource using the specified upgrade mode.
+     *
+     * @param resource Related Flink resource.
+     * @param upgradeMode
+     * @param observeConfig Observe configuration.
+     * @throws Exception
+     */
+    protected abstract void cancelJob(
+            CR resource, UpgradeMode upgradeMode, Configuration observeConfig) 
throws Exception;
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 595b7ef..b89a88b 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -17,34 +17,28 @@
 
 package org.apache.flink.kubernetes.operator.reconciler.deployment;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
-import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
-import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
-import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
-import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
-import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
-import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
 import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 
 import io.fabric8.kubernetes.api.model.ObjectMeta;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
 import lombok.SneakyThrows;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,11 +46,9 @@ import org.slf4j.LoggerFactory;
 import java.util.Optional;
 import java.util.UUID;
 
-/**
- * Reconciler responsible for handling the job lifecycle according to the 
desired and current
- * states.
- */
-public class ApplicationReconciler extends AbstractDeploymentReconciler {
+/** Reconciler Flink Application deployments. */
+public class ApplicationReconciler
+        extends AbstractJobReconciler<FlinkDeployment, FlinkDeploymentSpec, 
FlinkDeploymentStatus> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ApplicationReconciler.class);
 
@@ -69,166 +61,30 @@ public class ApplicationReconciler extends 
AbstractDeploymentReconciler {
     }
 
     @Override
-    public void reconcile(FlinkDeployment flinkApp, Context context) throws 
Exception {
-        ObjectMeta deployMeta = flinkApp.getMetadata();
-        FlinkDeploymentStatus status = flinkApp.getStatus();
-        ReconciliationStatus<FlinkDeploymentSpec> reconciliationStatus =
-                status.getReconciliationStatus();
-        FlinkDeploymentSpec lastReconciledSpec =
-                reconciliationStatus.deserializeLastReconciledSpec();
-        FlinkDeploymentSpec currentDeploySpec = flinkApp.getSpec();
-
-        JobSpec desiredJobSpec = currentDeploySpec.getJob();
-        Configuration deployConfig = configManager.getDeployConfig(deployMeta, 
currentDeploySpec);
-        if (lastReconciledSpec == null) {
-            LOG.debug("Deploying application for the first time");
-            deployFlinkJob(
-                    deployMeta,
-                    desiredJobSpec,
-                    status,
-                    deployConfig,
-                    
Optional.ofNullable(desiredJobSpec.getInitialSavepointPath()),
-                    false);
-            IngressUtils.updateIngressRules(
-                    deployMeta, currentDeploySpec, deployConfig, 
kubernetesClient);
-            ReconciliationUtils.updateForSpecReconciliationSuccess(
-                    flinkApp, JobState.RUNNING, deployConfig);
-            return;
-        }
-
-        if (!deployConfig.getBoolean(
-                        
KubernetesOperatorConfigOptions.JOB_UPGRADE_IGNORE_PENDING_SAVEPOINT)
-                && SavepointUtils.savepointInProgress(status.getJobStatus())) {
-            LOG.info("Delaying job reconciliation until pending savepoint is 
completed.");
-            return;
-        }
-
-        Configuration observeConfig = configManager.getObserveConfig(flinkApp);
-        boolean specChanged = !currentDeploySpec.equals(lastReconciledSpec);
-        if (specChanged) {
-            if (newSpecIsAlreadyDeployed(flinkApp, deployConfig)) {
-                return;
-            }
-            LOG.debug("Detected spec change, starting upgrade process.");
-            JobState currentJobState = lastReconciledSpec.getJob().getState();
-            JobState desiredJobState = desiredJobSpec.getState();
-            JobState stateAfterReconcile = currentJobState;
-            if (currentJobState == JobState.RUNNING) {
-                if (desiredJobState == JobState.RUNNING) {
-                    LOG.info("Upgrading/Restarting running job, suspending 
first...");
-                }
-                Optional<UpgradeMode> availableUpgradeMode =
-                        getAvailableUpgradeMode(flinkApp, deployConfig);
-                if (availableUpgradeMode.isEmpty()) {
-                    return;
-                }
-                // We must record the upgrade mode used to the status later
-                desiredJobSpec.setUpgradeMode(availableUpgradeMode.get());
-                flinkService.cancelJob(flinkApp, availableUpgradeMode.get());
-                stateAfterReconcile = JobState.SUSPENDED;
-            }
-            if (currentJobState == JobState.SUSPENDED && desiredJobState == 
JobState.RUNNING) {
-                restoreJob(
-                        deployMeta,
-                        desiredJobSpec,
-                        status,
-                        deployConfig,
-                        // We decide to enforce HA based on how job was 
previously suspended
-                        lastReconciledSpec.getJob().getUpgradeMode() == 
UpgradeMode.LAST_STATE);
-                stateAfterReconcile = JobState.RUNNING;
-            }
-            ReconciliationUtils.updateForSpecReconciliationSuccess(
-                    flinkApp, stateAfterReconcile, deployConfig);
-            IngressUtils.updateIngressRules(
-                    deployMeta, currentDeploySpec, deployConfig, 
kubernetesClient);
-        } else if (ReconciliationUtils.shouldRollBack(
-                flinkService, reconciliationStatus, observeConfig)) {
-            rollbackApplication(flinkApp);
-        } else if (ReconciliationUtils.shouldRecoverDeployment(observeConfig, 
flinkApp)) {
-            recoverJmDeployment(flinkApp, observeConfig);
-        } else {
-            if (!SavepointUtils.triggerSavepointIfNeeded(flinkService, 
flinkApp, observeConfig)) {
-                LOG.info("Deployment is fully reconciled, nothing to do.");
-            }
-        }
+    protected Configuration getObserveConfig(FlinkDeployment deployment, 
Context context) {
+        return configManager.getObserveConfig(deployment);
     }
 
-    private void rollbackApplication(FlinkDeployment flinkApp) throws 
Exception {
-        ReconciliationStatus<FlinkDeploymentSpec> reconciliationStatus =
-                flinkApp.getStatus().getReconciliationStatus();
-
-        if (initiateRollBack(flinkApp.getStatus())) {
-            return;
-        }
-
-        LOG.warn("Executing rollback operation");
-        FlinkDeploymentSpec rollbackSpec = 
reconciliationStatus.deserializeLastStableSpec();
-        Configuration rollbackConfig =
-                configManager.getDeployConfig(flinkApp.getMetadata(), 
rollbackSpec);
-        UpgradeMode upgradeMode = flinkApp.getSpec().getJob().getUpgradeMode();
-
-        flinkService.cancelJob(
-                flinkApp,
-                upgradeMode == UpgradeMode.STATELESS
-                        ? UpgradeMode.STATELESS
-                        : UpgradeMode.LAST_STATE);
-
-        restoreJob(
-                flinkApp.getMetadata(),
-                rollbackSpec.getJob(),
-                flinkApp.getStatus(),
-                rollbackConfig,
-                upgradeMode != UpgradeMode.STATELESS);
-
-        reconciliationStatus.setState(ReconciliationState.ROLLED_BACK);
-        IngressUtils.updateIngressRules(
-                flinkApp.getMetadata(), rollbackSpec, rollbackConfig, 
kubernetesClient);
+    @Override
+    protected Configuration getDeployConfig(
+            ObjectMeta deployMeta, FlinkDeploymentSpec currentDeploySpec, 
Context context) {
+        return configManager.getDeployConfig(deployMeta, currentDeploySpec);
     }
 
-    private void recoverJmDeployment(FlinkDeployment deployment, Configuration 
observeConfig)
-            throws Exception {
-        LOG.info("Missing Flink Cluster deployment, trying to recover...");
-        FlinkDeploymentSpec specToRecover = 
ReconciliationUtils.getDeployedSpec(deployment);
-        restoreJob(
-                deployment.getMetadata(),
-                specToRecover.getJob(),
-                deployment.getStatus(),
-                observeConfig,
-                true);
-    }
+    @Override
+    protected Optional<UpgradeMode> getAvailableUpgradeMode(
+            FlinkDeployment deployment, Configuration deployConfig, 
Configuration observeConfig) {
 
-    private Optional<UpgradeMode> getAvailableUpgradeMode(
-            FlinkDeployment deployment, Configuration deployConfig) {
         var status = deployment.getStatus();
-        var upgradeMode = deployment.getSpec().getJob().getUpgradeMode();
-        var changedToLastStateWithoutHa =
-                
ReconciliationUtils.isUpgradeModeChangedToLastStateAndHADisabledPreviously(
-                        deployment, configManager);
-
-        if (upgradeMode == UpgradeMode.STATELESS) {
-            LOG.info("Stateless job, ready for upgrade");
-            return Optional.of(upgradeMode);
-        }
-
-        if (ReconciliationUtils.isJobInTerminalState(status)) {
-            LOG.info(
-                    "Job is in terminal state, ready for upgrade from observed 
latest checkpoint/savepoint");
-            return Optional.of(UpgradeMode.SAVEPOINT);
-        }
+        var availableUpgradeMode =
+                super.getAvailableUpgradeMode(deployment, deployConfig, 
observeConfig);
 
-        if (ReconciliationUtils.isJobRunning(status)) {
-            LOG.info("Job is in running state, ready for upgrade with {}", 
upgradeMode);
-            if (changedToLastStateWithoutHa) {
-                LOG.info(
-                        "Using savepoint upgrade mode when switching to 
last-state without HA previously enabled");
-                return Optional.of(UpgradeMode.SAVEPOINT);
-            } else {
-                return Optional.of(upgradeMode);
-            }
+        if (availableUpgradeMode.isPresent()) {
+            return availableUpgradeMode;
         }
 
         if (FlinkUtils.isKubernetesHAActivated(deployConfig)
-                && 
FlinkUtils.isKubernetesHAActivated(configManager.getObserveConfig(deployment))
+                && FlinkUtils.isKubernetesHAActivated(observeConfig)
                 && flinkService.isHaMetadataAvailable(deployConfig)) {
             LOG.info(
                     "Job is not running but HA metadata is available for last 
state restore, ready for upgrade");
@@ -242,17 +98,17 @@ public class ApplicationReconciler extends 
AbstractDeploymentReconciler {
                             + "It is possible that the job has finished or 
terminally failed, or the configmaps have been deleted. "
                             + "Manual restore required.",
                     "UpgradeFailed");
-        } else {
-            LOG.info(
-                    "Job is not running yet and HA metadata is not available, 
waiting for upgradeable state");
-            return Optional.empty();
         }
+
+        LOG.info(
+                "Job is not running yet and HA metadata is not available, 
waiting for upgradeable state");
+        return Optional.empty();
     }
 
-    @VisibleForTesting
-    protected void deployFlinkJob(
+    @Override
+    protected void deploy(
             ObjectMeta meta,
-            JobSpec jobSpec,
+            FlinkDeploymentSpec spec,
             FlinkDeploymentStatus status,
             Configuration deployConfig,
             Optional<String> savepoint,
@@ -282,27 +138,18 @@ public class ApplicationReconciler extends 
AbstractDeploymentReconciler {
                             .getFlinkShutdownClusterTimeout()
                             .toSeconds());
         }
-        flinkService.submitApplicationCluster(jobSpec, deployConfig, 
requireHaMetadata);
+        flinkService.submitApplicationCluster(spec.getJob(), deployConfig, 
requireHaMetadata);
         
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
         
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
+
+        IngressUtils.updateIngressRules(meta, spec, deployConfig, 
kubernetesClient);
     }
 
-    private void restoreJob(
-            ObjectMeta meta,
-            JobSpec jobSpec,
-            FlinkDeploymentStatus status,
-            Configuration deployConfig,
-            boolean requireHaMetadata)
+    @Override
+    protected void cancelJob(
+            FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration 
observeConfig)
             throws Exception {
-        Optional<String> savepointOpt = Optional.empty();
-
-        if (jobSpec.getUpgradeMode() != UpgradeMode.STATELESS) {
-            savepointOpt =
-                    
Optional.ofNullable(status.getJobStatus().getSavepointInfo().getLastSavepoint())
-                            .flatMap(s -> 
Optional.ofNullable(s.getLocation()));
-        }
-
-        deployFlinkJob(meta, jobSpec, status, deployConfig, savepointOpt, 
requireHaMetadata);
+        flinkService.cancelJob(deployment, upgradeMode, observeConfig);
     }
 
     // Workaround for https://issues.apache.org/jira/browse/FLINK-27569
@@ -323,14 +170,42 @@ public class ApplicationReconciler extends 
AbstractDeploymentReconciler {
         }
     }
 
+    @Override
+    public boolean reconcileOtherChanges(FlinkDeployment deployment, 
Configuration observeConfig)
+            throws Exception {
+        if (super.reconcileOtherChanges(deployment, observeConfig)) {
+            return true;
+        }
+
+        if (shouldRecoverDeployment(observeConfig, deployment)) {
+            recoverJmDeployment(deployment, observeConfig);
+            return true;
+        }
+        return false;
+    }
+
+    private void recoverJmDeployment(FlinkDeployment deployment, Configuration 
observeConfig)
+            throws Exception {
+        LOG.info("Missing Flink Cluster deployment, trying to recover...");
+        FlinkDeploymentSpec specToRecover = 
ReconciliationUtils.getDeployedSpec(deployment);
+        restoreJob(
+                deployment.getMetadata(),
+                specToRecover,
+                deployment.getStatus(),
+                observeConfig,
+                true);
+    }
+
     @Override
     @SneakyThrows
-    protected void shutdown(FlinkDeployment flinkApp) {
-        var status = flinkApp.getStatus();
+    protected DeleteControl cleanupInternal(FlinkDeployment deployment, 
Context context) {
+        var status = deployment.getStatus();
         if (status.getReconciliationStatus().getLastReconciledSpec() == null) {
-            flinkService.deleteClusterDeployment(flinkApp.getMetadata(), 
status, true);
+            flinkService.deleteClusterDeployment(deployment.getMetadata(), 
status, true);
         } else {
-            flinkService.cancelJob(flinkApp, UpgradeMode.STATELESS);
+            flinkService.cancelJob(
+                    deployment, UpgradeMode.STATELESS, 
configManager.getObserveConfig(deployment));
         }
+        return DeleteControl.defaultDelete();
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index ccf74f5..ba8d390 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -33,12 +33,14 @@ import 
org.apache.flink.kubernetes.operator.utils.EventUtils;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 
+import io.fabric8.kubernetes.api.model.ObjectMeta;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -46,7 +48,9 @@ import java.util.stream.Collectors;
  * Reconciler responsible for handling the session cluster lifecycle according 
to the desired and
  * current states.
  */
-public class SessionReconciler extends AbstractDeploymentReconciler {
+public class SessionReconciler
+        extends AbstractFlinkResourceReconciler<
+                FlinkDeployment, FlinkDeploymentSpec, FlinkDeploymentStatus> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SessionReconciler.class);
 
@@ -59,42 +63,28 @@ public class SessionReconciler extends 
AbstractDeploymentReconciler {
     }
 
     @Override
-    public void reconcile(FlinkDeployment flinkApp, Context context) throws 
Exception {
-        FlinkDeploymentStatus status = flinkApp.getStatus();
-        ReconciliationStatus<FlinkDeploymentSpec> reconciliationStatus =
-                status.getReconciliationStatus();
-        FlinkDeploymentSpec lastReconciledSpec =
-                reconciliationStatus.deserializeLastReconciledSpec();
-        FlinkDeploymentSpec currentDeploySpec = flinkApp.getSpec();
-
-        if (lastReconciledSpec == null) {
-            Configuration conf =
-                    configManager.getDeployConfig(flinkApp.getMetadata(), 
currentDeploySpec);
-            flinkService.submitSessionCluster(conf);
-            
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
-            IngressUtils.updateIngressRules(
-                    flinkApp.getMetadata(), currentDeploySpec, conf, 
kubernetesClient);
-            ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp, 
null, conf);
-            return;
-        }
+    protected Configuration getDeployConfig(
+            ObjectMeta meta, FlinkDeploymentSpec spec, Context ctx) {
+        return configManager.getDeployConfig(meta, spec);
+    }
 
-        Configuration observeConfig = configManager.getObserveConfig(flinkApp);
-        boolean specChanged = !currentDeploySpec.equals(lastReconciledSpec);
-        if (specChanged) {
-            var deployConf =
-                    configManager.getDeployConfig(flinkApp.getMetadata(), 
currentDeploySpec);
-            if (newSpecIsAlreadyDeployed(flinkApp, deployConf)) {
-                return;
-            }
-            LOG.debug("Detected spec change, starting upgrade process.");
-            upgradeSessionCluster(flinkApp, currentDeploySpec, deployConf);
-            ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp, 
null, deployConf);
-        } else if (ReconciliationUtils.shouldRollBack(
-                flinkService, reconciliationStatus, observeConfig)) {
-            rollbackSessionCluster(flinkApp);
-        } else if (ReconciliationUtils.shouldRecoverDeployment(observeConfig, 
flinkApp)) {
-            recoverSession(flinkApp, observeConfig);
-        }
+    @Override
+    protected Configuration getObserveConfig(FlinkDeployment resource, Context 
context) {
+        return configManager.getObserveConfig(resource);
+    }
+
+    @Override
+    protected boolean readyToReconcile(
+            FlinkDeployment deployment, Context ctx, Configuration 
deployConfig) {
+        return true;
+    }
+
+    @Override
+    protected void reconcileSpecChange(
+            FlinkDeployment deployment, Configuration observeConfig, 
Configuration deployConfig)
+            throws Exception {
+        upgradeSessionCluster(deployment, deployment.getSpec(), deployConfig);
+        ReconciliationUtils.updateForSpecReconciliationSuccess(deployment, 
null, deployConfig);
     }
 
     private void upgradeSessionCluster(
@@ -112,24 +102,33 @@ public class SessionReconciler extends 
AbstractDeploymentReconciler {
                         .getOperatorConfiguration()
                         .getFlinkShutdownClusterTimeout()
                         .toSeconds());
-        flinkService.submitSessionCluster(effectiveConfig);
-        
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
-        IngressUtils.updateIngressRules(
-                deployment.getMetadata(), deploySpec, effectiveConfig, 
kubernetesClient);
+        deploy(
+                deployment.getMetadata(),
+                deploySpec,
+                deployment.getStatus(),
+                effectiveConfig,
+                Optional.empty(),
+                false);
     }
 
-    private void recoverSession(FlinkDeployment deployment, Configuration 
effectiveConfig)
+    @Override
+    protected void deploy(
+            ObjectMeta meta,
+            FlinkDeploymentSpec spec,
+            FlinkDeploymentStatus status,
+            Configuration deployConfig,
+            Optional<String> savepoint,
+            boolean requireHaMetadata)
             throws Exception {
-        flinkService.submitSessionCluster(effectiveConfig);
-        
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
+        flinkService.submitSessionCluster(deployConfig);
+        
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
+        IngressUtils.updateIngressRules(meta, spec, deployConfig, 
kubernetesClient);
     }
 
-    private void rollbackSessionCluster(FlinkDeployment deployment) throws 
Exception {
+    @Override
+    protected void rollback(FlinkDeployment deployment, Context ctx, 
Configuration observeConfig)
+            throws Exception {
         FlinkDeploymentStatus status = deployment.getStatus();
-        if (initiateRollBack(status)) {
-            return;
-        }
-
         ReconciliationStatus<FlinkDeploymentSpec> reconciliationStatus =
                 status.getReconciliationStatus();
         FlinkDeploymentSpec rollbackSpec = 
reconciliationStatus.deserializeLastStableSpec();
@@ -140,15 +139,23 @@ public class SessionReconciler extends 
AbstractDeploymentReconciler {
     }
 
     @Override
-    protected void shutdown(FlinkDeployment deployment) {
-        LOG.info("Stopping session cluster");
-        flinkService.deleteClusterDeployment(
-                deployment.getMetadata(), deployment.getStatus(), true);
+    public boolean reconcileOtherChanges(FlinkDeployment flinkApp, 
Configuration observeConfig)
+            throws Exception {
+        if (shouldRecoverDeployment(observeConfig, flinkApp)) {
+            recoverSession(flinkApp, observeConfig);
+            return true;
+        }
+        return false;
     }
 
-    @Override
-    public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
+    private void recoverSession(FlinkDeployment deployment, Configuration 
effectiveConfig)
+            throws Exception {
+        flinkService.submitSessionCluster(effectiveConfig);
+        
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
+    }
 
+    @Override
+    public DeleteControl cleanupInternal(FlinkDeployment deployment, Context 
context) {
         Set<FlinkSessionJob> sessionJobs = 
context.getSecondaryResources(FlinkSessionJob.class);
         if (!sessionJobs.isEmpty()) {
             var error =
@@ -158,7 +165,7 @@ public class SessionReconciler extends 
AbstractDeploymentReconciler {
                                     .map(job -> job.getMetadata().getName())
                                     .collect(Collectors.toList()));
             if (eventRecorder.triggerEvent(
-                    flinkApp,
+                    deployment,
                     EventUtils.Type.Warning,
                     "Cleanup",
                     error,
@@ -172,7 +179,10 @@ public class SessionReconciler extends 
AbstractDeploymentReconciler {
                                     .getReconcileInterval()
                                     .toMillis());
         } else {
-            return super.cleanup(flinkApp, context);
+            LOG.info("Stopping session cluster");
+            flinkService.deleteClusterDeployment(
+                    deployment.getMetadata(), deployment.getStatus(), true);
+            return DeleteControl.defaultDelete();
         }
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java
deleted file mode 100644
index 0079d80..0000000
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.kubernetes.operator.reconciler.sessionjob;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
-import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
-import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
-import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
-import org.apache.flink.kubernetes.operator.crd.spec.JobState;
-import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
-import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
-import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
-import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
-import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
-import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
-import org.apache.flink.kubernetes.operator.service.FlinkService;
-import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
-import org.apache.flink.util.Preconditions;
-
-import io.fabric8.kubernetes.client.KubernetesClient;
-import io.javaoperatorsdk.operator.api.reconciler.Context;
-import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.util.Optional;
-
-/** The reconciler for the {@link FlinkSessionJob}. */
-public class FlinkSessionJobReconciler implements Reconciler<FlinkSessionJob> {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(FlinkSessionJobReconciler.class);
-
-    private final FlinkConfigManager configManager;
-    private final KubernetesClient kubernetesClient;
-    private final FlinkService flinkService;
-
-    public FlinkSessionJobReconciler(
-            KubernetesClient kubernetesClient,
-            FlinkService flinkService,
-            FlinkConfigManager configManager) {
-        this.kubernetesClient = kubernetesClient;
-        this.flinkService = flinkService;
-        this.configManager = configManager;
-    }
-
-    @Override
-    public void reconcile(FlinkSessionJob flinkSessionJob, Context context) 
throws Exception {
-
-        SessionJobHelper helper = new SessionJobHelper(flinkSessionJob, LOG);
-        FlinkSessionJobSpec lastReconciledSpec =
-                flinkSessionJob
-                        .getStatus()
-                        .getReconciliationStatus()
-                        .deserializeLastReconciledSpec();
-
-        Optional<FlinkDeployment> flinkDepOptional =
-                context.getSecondaryResource(FlinkDeployment.class);
-
-        // if session cluster is not ready, we can't do reconcile for the job.
-        if (!helper.sessionClusterReady(flinkDepOptional)) {
-            return;
-        }
-
-        Configuration sessionJobConfig =
-                configManager.getSessionJobConfig(flinkDepOptional.get(), 
flinkSessionJob);
-
-        if (lastReconciledSpec == null) {
-            submitAndInitStatus(
-                    flinkSessionJob,
-                    sessionJobConfig,
-                    Optional.ofNullable(
-                                    
flinkSessionJob.getSpec().getJob().getInitialSavepointPath())
-                            .orElse(null));
-            ReconciliationUtils.updateForSpecReconciliationSuccess(
-                    flinkSessionJob, JobState.RUNNING, sessionJobConfig);
-            return;
-        }
-
-        if (!sessionJobConfig.getBoolean(
-                        
KubernetesOperatorConfigOptions.JOB_UPGRADE_IGNORE_PENDING_SAVEPOINT)
-                && 
SavepointUtils.savepointInProgress(flinkSessionJob.getStatus().getJobStatus())) 
{
-            LOG.info("Delaying job reconciliation until pending savepoint is 
completed");
-            return;
-        }
-
-        boolean specChanged = helper.specChanged(lastReconciledSpec);
-        var jobSpec = flinkSessionJob.getSpec().getJob();
-
-        if (specChanged) {
-            JobState currentJobState = lastReconciledSpec.getJob().getState();
-            JobState desiredJobState = jobSpec.getState();
-            UpgradeMode upgradeMode = jobSpec.getUpgradeMode();
-            JobState stateAfterReconcile = currentJobState;
-            if (currentJobState == JobState.RUNNING) {
-                if (desiredJobState == JobState.RUNNING) {
-                    LOG.info("Upgrading/Restarting running job, suspending 
first...");
-                }
-                stateAfterReconcile = suspendJob(flinkSessionJob, upgradeMode, 
sessionJobConfig);
-            }
-            if (currentJobState == JobState.SUSPENDED && desiredJobState == 
JobState.RUNNING) {
-                if (upgradeMode == UpgradeMode.STATELESS) {
-                    submitAndInitStatus(flinkSessionJob, sessionJobConfig, 
null);
-                } else if (upgradeMode == UpgradeMode.LAST_STATE
-                        || upgradeMode == UpgradeMode.SAVEPOINT) {
-                    restoreFromLastSavepoint(flinkSessionJob, 
sessionJobConfig);
-                }
-                stateAfterReconcile = JobState.RUNNING;
-            }
-            ReconciliationUtils.updateForSpecReconciliationSuccess(
-                    flinkSessionJob, stateAfterReconcile, sessionJobConfig);
-        } else {
-            if (!SavepointUtils.triggerSavepointIfNeeded(
-                    flinkService, flinkSessionJob, sessionJobConfig)) {
-                LOG.info("Session Job is fully reconciled, nothing to do.");
-            }
-        }
-    }
-
-    @Override
-    public DeleteControl cleanup(FlinkSessionJob sessionJob, Context context) {
-        Optional<FlinkDeployment> flinkDepOptional =
-                context.getSecondaryResource(FlinkDeployment.class);
-
-        if (flinkDepOptional.isPresent()) {
-            String jobID = sessionJob.getStatus().getJobStatus().getJobId();
-            if (jobID != null) {
-                try {
-                    flinkService.cancelSessionJob(
-                            JobID.fromHexString(jobID),
-                            UpgradeMode.STATELESS,
-                            
configManager.getSessionJobConfig(flinkDepOptional.get(), sessionJob));
-                } catch (Exception e) {
-                    LOG.error("Failed to cancel job.", e);
-                }
-            }
-        } else {
-            LOG.info("Session cluster deployment not available");
-        }
-        return DeleteControl.defaultDelete();
-    }
-
-    private void submitAndInitStatus(
-            FlinkSessionJob sessionJob, Configuration effectiveConfig, 
@Nullable String savepoint)
-            throws Exception {
-        var jobID = flinkService.submitJobToSessionCluster(sessionJob, 
effectiveConfig, savepoint);
-        sessionJob
-                .getStatus()
-                .setJobStatus(
-                        new JobStatus()
-                                .toBuilder()
-                                .jobId(jobID.toHexString())
-                                
.state(org.apache.flink.api.common.JobStatus.RECONCILING.name())
-                                .build());
-    }
-
-    private void restoreFromLastSavepoint(
-            FlinkSessionJob flinkSessionJob, Configuration effectiveConfig) 
throws Exception {
-        JobStatus jobStatus = flinkSessionJob.getStatus().getJobStatus();
-        Optional<String> savepointOpt =
-                
Optional.ofNullable(jobStatus.getSavepointInfo().getLastSavepoint())
-                        .flatMap(s -> Optional.ofNullable(s.getLocation()));
-
-        submitAndInitStatus(flinkSessionJob, effectiveConfig, 
savepointOpt.orElse(null));
-    }
-
-    private Optional<String> internalSuspendJob(
-            FlinkSessionJob sessionJob, UpgradeMode upgradeMode, Configuration 
effectiveConfig)
-            throws Exception {
-        final String jobIdString = 
sessionJob.getStatus().getJobStatus().getJobId();
-        Preconditions.checkNotNull(jobIdString, "The job to be suspend should 
not be null");
-        return flinkService.cancelSessionJob(
-                JobID.fromHexString(jobIdString), upgradeMode, 
effectiveConfig);
-    }
-
-    private JobState suspendJob(
-            FlinkSessionJob sessionJob, UpgradeMode upgradeMode, Configuration 
effectiveConfig)
-            throws Exception {
-        final Optional<String> savepointOpt =
-                internalSuspendJob(sessionJob, upgradeMode, effectiveConfig);
-
-        JobStatus jobStatus = sessionJob.getStatus().getJobStatus();
-        JobState stateAfterReconcile = JobState.SUSPENDED;
-        jobStatus.setState(stateAfterReconcile.name());
-        savepointOpt.ifPresent(
-                location -> {
-                    Savepoint sp = Savepoint.of(location, 
SavepointTriggerType.UPGRADE);
-                    jobStatus.getSavepointInfo().updateLastSavepoint(sp);
-                });
-        return stateAfterReconcile;
-    }
-}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobHelper.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobHelper.java
deleted file mode 100644
index a1f36c2..0000000
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobHelper.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.kubernetes.operator.reconciler.sessionjob;
-
-import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
-import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
-import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
-
-import org.slf4j.Logger;
-
-import java.util.Optional;
-
-/** A tool for session job management condition checker. */
-public class SessionJobHelper {
-
-    private final Logger logger;
-    private final FlinkSessionJob sessionJob;
-
-    public SessionJobHelper(FlinkSessionJob sessionJob, Logger logger) {
-        this.sessionJob = sessionJob;
-        this.logger = logger;
-    }
-
-    public boolean specChanged(FlinkSessionJobSpec lastReconciledSpec) {
-        return !sessionJob.getSpec().equals(lastReconciledSpec);
-    }
-
-    public boolean sessionClusterReady(Optional<FlinkDeployment> 
flinkDeploymentOpt) {
-        if (flinkDeploymentOpt.isPresent()) {
-            var flinkdep = flinkDeploymentOpt.get();
-            var jobmanagerDeploymentStatus = 
flinkdep.getStatus().getJobManagerDeploymentStatus();
-            if (jobmanagerDeploymentStatus != 
JobManagerDeploymentStatus.READY) {
-                logger.info(
-                        "Session cluster deployment is in {} status, not ready 
for serve",
-                        jobmanagerDeploymentStatus);
-                return false;
-            } else {
-                return true;
-            }
-        } else {
-            logger.warn("Session cluster deployment is not found");
-            return false;
-        }
-    }
-}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
new file mode 100644
index 0000000..7757500
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.sessionjob;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
+import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import 
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/** The reconciler for the {@link FlinkSessionJob}. */
+public class SessionJobReconciler
+        extends AbstractJobReconciler<FlinkSessionJob, FlinkSessionJobSpec, 
FlinkSessionJobStatus> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SessionJobReconciler.class);
+
+    public SessionJobReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkConfigManager configManager,
+            EventRecorder eventRecorder) {
+        super(kubernetesClient, flinkService, configManager, eventRecorder);
+    }
+
+    @Override
+    protected Configuration getObserveConfig(FlinkSessionJob sessionJob, 
Context context) {
+        return getDeployConfig(sessionJob.getMetadata(), sessionJob.getSpec(), 
context);
+    }
+
+    @Override
+    protected Configuration getDeployConfig(
+            ObjectMeta deployMeta, FlinkSessionJobSpec currentDeploySpec, 
Context context) {
+        Optional<FlinkDeployment> deploymentOpt =
+                context.getSecondaryResource(FlinkDeployment.class);
+
+        if (!sessionClusterReady(deploymentOpt)) {
+            return null;
+        }
+        return configManager.getSessionJobConfig(deploymentOpt.get(), 
currentDeploySpec);
+    }
+
+    @Override
+    public boolean readyToReconcile(
+            FlinkSessionJob flinkSessionJob, Context context, Configuration 
deployConfig) {
+        return 
sessionClusterReady(context.getSecondaryResource(FlinkDeployment.class))
+                && super.readyToReconcile(flinkSessionJob, context, 
deployConfig);
+    }
+
+    @Override
+    protected void deploy(
+            ObjectMeta meta,
+            FlinkSessionJobSpec sessionJobSpec,
+            FlinkSessionJobStatus status,
+            Configuration deployConfig,
+            Optional<String> savepoint,
+            boolean requireHaMetadata)
+            throws Exception {
+        var jobID =
+                flinkService.submitJobToSessionCluster(
+                        meta, sessionJobSpec, deployConfig, 
savepoint.orElse(null));
+        status.setJobStatus(
+                new JobStatus()
+                        .toBuilder()
+                        .jobId(jobID.toHexString())
+                        
.state(org.apache.flink.api.common.JobStatus.RECONCILING.name())
+                        .build());
+    }
+
+    @Override
+    protected void cancelJob(
+            FlinkSessionJob resource, UpgradeMode upgradeMode, Configuration 
observeConfig)
+            throws Exception {
+        flinkService.cancelSessionJob(resource, upgradeMode, observeConfig);
+    }
+
+    @Override
+    public DeleteControl cleanupInternal(FlinkSessionJob sessionJob, Context 
context) {
+        Optional<FlinkDeployment> flinkDepOptional =
+                context.getSecondaryResource(FlinkDeployment.class);
+
+        if (flinkDepOptional.isPresent()) {
+            String jobID = sessionJob.getStatus().getJobStatus().getJobId();
+            if (jobID != null) {
+                try {
+                    cancelJob(
+                            sessionJob,
+                            UpgradeMode.STATELESS,
+                            getObserveConfig(sessionJob, context));
+                } catch (Exception e) {
+                    LOG.error("Failed to cancel job.", e);
+                }
+            }
+        } else {
+            LOG.info("Session cluster deployment not available");
+        }
+        return DeleteControl.defaultDelete();
+    }
+
+    public static boolean sessionClusterReady(Optional<FlinkDeployment> 
flinkDeploymentOpt) {
+        if (flinkDeploymentOpt.isPresent()) {
+            var flinkdep = flinkDeploymentOpt.get();
+            var jobmanagerDeploymentStatus = 
flinkdep.getStatus().getJobManagerDeploymentStatus();
+            if (jobmanagerDeploymentStatus != 
JobManagerDeploymentStatus.READY) {
+                LOG.info(
+                        "Session cluster deployment is in {} status, not ready 
for serve",
+                        jobmanagerDeploymentStatus);
+                return false;
+            } else {
+                return true;
+            }
+        } else {
+            LOG.warn("Session cluster deployment is not found");
+            return false;
+        }
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index 2ba73e0..e2d2994 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.kubernetes.operator.artifact.ArtifactManager;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
@@ -197,19 +198,20 @@ public class FlinkService {
     }
 
     public JobID submitJobToSessionCluster(
-            FlinkSessionJob sessionJob, Configuration conf, @Nullable String 
savepoint)
+            ObjectMeta meta,
+            FlinkSessionJobSpec spec,
+            Configuration conf,
+            @Nullable String savepoint)
             throws Exception {
-        var jarRunResponseBody = runJar(sessionJob, uploadJar(sessionJob, 
conf), conf, savepoint);
+        var jarRunResponseBody =
+                runJar(spec.getJob(), uploadJar(meta, spec, conf), conf, 
savepoint);
         var jobID = jarRunResponseBody.getJobId();
         LOG.info("Submitted job: {} to session cluster.", jobID);
         return jobID;
     }
 
     private JarRunResponseBody runJar(
-            FlinkSessionJob sessionJob,
-            JarUploadResponseBody response,
-            Configuration conf,
-            String savepoint) {
+            JobSpec job, JarUploadResponseBody response, Configuration conf, 
String savepoint) {
         String jarId =
                 
response.getFilename().substring(response.getFilename().lastIndexOf("/") + 1);
         // we generate jobID in advance to help deduplicate job submission.
@@ -219,7 +221,6 @@ public class FlinkService {
             JarRunHeaders headers = JarRunHeaders.getInstance();
             JarRunMessageParameters parameters = 
headers.getUnresolvedMessageParameters();
             parameters.jarIdPathParameter.resolve(jarId);
-            JobSpec job = sessionJob.getSpec().getJob();
             JarRunRequestBody runRequestBody =
                     new JarRunRequestBody(
                             job.getEntryClass(),
@@ -249,17 +250,16 @@ public class FlinkService {
         }
     }
 
-    private JarUploadResponseBody uploadJar(FlinkSessionJob sessionJob, 
Configuration conf)
-            throws Exception {
-        String targetDir = artifactManager.generateJarDir(sessionJob);
-        File jarFile =
-                
artifactManager.fetch(sessionJob.getSpec().getJob().getJarURI(), conf, 
targetDir);
+    private JarUploadResponseBody uploadJar(
+            ObjectMeta objectMeta, FlinkSessionJobSpec spec, Configuration 
conf) throws Exception {
+        String targetDir = artifactManager.generateJarDir(objectMeta, spec);
+        File jarFile = artifactManager.fetch(spec.getJob().getJarURI(), conf, 
targetDir);
         Preconditions.checkArgument(
                 jarFile.exists(),
                 String.format("The jar file %s not exists", 
jarFile.getAbsolutePath()));
         JarUploadHeaders headers = JarUploadHeaders.getInstance();
-        String clusterId = sessionJob.getSpec().getDeploymentName();
-        String namespace = sessionJob.getMetadata().getNamespace();
+        String clusterId = spec.getDeploymentName();
+        String namespace = objectMeta.getNamespace();
         int port = conf.getInteger(RestOptions.PORT);
         String host =
                 ObjectUtils.firstNonNull(
@@ -370,8 +370,8 @@ public class FlinkService {
                 config, clusterId, (c, e) -> new 
StandaloneClientHAServices(restServerAddress));
     }
 
-    public void cancelJob(FlinkDeployment deployment, UpgradeMode upgradeMode) 
throws Exception {
-        var conf = configManager.getObserveConfig(deployment);
+    public void cancelJob(FlinkDeployment deployment, UpgradeMode upgradeMode, 
Configuration conf)
+            throws Exception {
         var deploymentStatus = deployment.getStatus();
         var jobIdString = deploymentStatus.getJobStatus().getJobId();
         var jobId = jobIdString != null ? JobID.fromHexString(jobIdString) : 
null;
@@ -489,8 +489,15 @@ public class FlinkService {
                         .toSeconds());
     }
 
-    public Optional<String> cancelSessionJob(
-            JobID jobID, UpgradeMode upgradeMode, Configuration conf) throws 
Exception {
+    public void cancelSessionJob(
+            FlinkSessionJob sessionJob, UpgradeMode upgradeMode, Configuration 
conf)
+            throws Exception {
+
+        var jobStatus = sessionJob.getStatus().getJobStatus();
+        var jobIdString = jobStatus.getJobId();
+        Preconditions.checkNotNull(jobIdString, "The job to be suspend should 
not be null");
+        var jobId = JobID.fromHexString(jobIdString);
+
         Optional<String> savepointOpt = Optional.empty();
         try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
             final String clusterId = clusterClient.getClusterId();
@@ -498,7 +505,7 @@ public class FlinkService {
                 case STATELESS:
                     LOG.info("Cancelling job.");
                     clusterClient
-                            .cancel(jobID)
+                            .cancel(jobId)
                             .get(
                                     configManager
                                             .getOperatorConfiguration()
@@ -519,7 +526,7 @@ public class FlinkService {
                         String savepoint =
                                 clusterClient
                                         .stopWithSavepoint(
-                                                jobID,
+                                                jobId,
                                                 false,
                                                 savepointDirectory,
                                                 conf.get(FLINK_VERSION)
@@ -535,7 +542,7 @@ public class FlinkService {
                                 String.format(
                                         "Timed out stopping the job %s in 
Flink cluster %s with savepoint, "
                                                 + "please configure a larger 
timeout via '%s'",
-                                        jobID,
+                                        jobId,
                                         clusterId,
                                         
ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.key()),
                                 exception);
@@ -546,7 +553,13 @@ public class FlinkService {
                     throw new RuntimeException("Unsupported upgrade mode " + 
upgradeMode);
             }
         }
-        return savepointOpt;
+
+        jobStatus.setState(JobStatus.FINISHED.name());
+        savepointOpt.ifPresent(
+                location -> {
+                    Savepoint sp = Savepoint.of(location, 
SavepointTriggerType.UPGRADE);
+                    jobStatus.getSavepointInfo().updateLastSavepoint(sp);
+                });
     }
 
     public void triggerSavepoint(
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index 8448b3d..012d789 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -318,7 +318,7 @@ public class DefaultValidator implements 
FlinkResourceValidator {
                     && deployment.getStatus().getJobManagerDeploymentStatus()
                             != JobManagerDeploymentStatus.MISSING
                     && 
ReconciliationUtils.isUpgradeModeChangedToLastStateAndHADisabledPreviously(
-                            deployment, configManager)) {
+                            deployment, 
configManager.getObserveConfig(deployment))) {
                 return Optional.of(
                         String.format(
                                 "Job could not be upgraded to last-state while 
config key[%s] is not set",
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index c1b4e8e..0e29516 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -28,10 +28,9 @@ import 
org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
-import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
@@ -85,7 +84,6 @@ public class TestingFlinkService extends FlinkService {
 
     private final List<Tuple2<String, JobStatusMessage>> jobs = new 
ArrayList<>();
     private final Map<JobID, String> jobErrors = new HashMap<>();
-    private final Map<JobID, SubmittedJobInfo> sessionJobs = new HashMap<>();
     private final Set<String> sessions = new HashSet<>();
     private boolean isPortReady = true;
     private boolean haDataAvailable = true;
@@ -138,7 +136,6 @@ public class TestingFlinkService extends FlinkService {
     public void clear() {
         jobs.clear();
         sessions.clear();
-        sessionJobs.clear();
     }
 
     @Override
@@ -185,16 +182,24 @@ public class TestingFlinkService extends FlinkService {
 
     @Override
     public JobID submitJobToSessionCluster(
-            FlinkSessionJob sessionJob, Configuration conf, @Nullable String 
savepoint) {
+            ObjectMeta meta,
+            FlinkSessionJobSpec spec,
+            Configuration conf,
+            @Nullable String savepoint)
+            throws Exception {
+
+        if (deployFailure) {
+            throw new Exception("Deployment failure");
+        }
         JobID jobID = new JobID();
         JobStatusMessage jobStatusMessage =
                 new JobStatusMessage(
                         jobID,
-                        sessionJob.getMetadata().getName(),
+                        conf.getString(KubernetesConfigOptions.CLUSTER_ID),
                         JobStatus.RUNNING,
                         System.currentTimeMillis());
-        sessionJob.getStatus().getJobStatus().setJobId(jobID.toHexString());
-        sessionJobs.put(jobID, new SubmittedJobInfo(savepoint, 
jobStatusMessage, conf));
+
+        jobs.add(Tuple2.of(savepoint, jobStatusMessage));
         return jobID;
     }
 
@@ -218,24 +223,6 @@ public class TestingFlinkService extends FlinkService {
         return jobs.stream().filter(t -> 
!t.f1.getJobState().isTerminalState()).count();
     }
 
-    public Map<JobID, SubmittedJobInfo> listSessionJobs() {
-        return new HashMap<>(sessionJobs);
-    }
-
-    @Override
-    public Optional<String> cancelSessionJob(
-            JobID jobID, UpgradeMode upgradeMode, Configuration conf) throws 
Exception {
-        if (sessionJobs.remove(jobID) == null) {
-            throw new Exception("Job not found");
-        }
-
-        if (upgradeMode == UpgradeMode.SAVEPOINT) {
-            return Optional.of("savepoint_" + savepointCounter++);
-        } else {
-            return Optional.empty();
-        }
-    }
-
     @Override
     public void triggerSavepoint(
             String jobId,
@@ -275,12 +262,8 @@ public class TestingFlinkService extends FlinkService {
                                     
.equals(KubernetesDeploymentTarget.APPLICATION.getName())) {
                         throw new RuntimeException("Trying to list a job 
without submitting it");
                     }
-                    var lists = jobs.stream().map(t -> 
t.f1).collect(Collectors.toList());
-                    lists.addAll(
-                            sessionJobs.values().stream()
-                                    .map(t -> t.jobStatusMessage)
-                                    .collect(Collectors.toList()));
-                    return CompletableFuture.completedFuture(lists);
+                    return CompletableFuture.completedFuture(
+                            jobs.stream().map(t -> 
t.f1).collect(Collectors.toList()));
                 });
 
         clusterClient.setStopWithSavepointFunction(
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/artifact/ArtifactManagerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/artifact/ArtifactManagerTest.java
index 380d66d..719420c 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/artifact/ArtifactManagerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/artifact/ArtifactManagerTest.java
@@ -62,7 +62,9 @@ public class ArtifactManagerTest {
 
     @Test
     public void testGenerateJarDir() {
-        String baseDir = 
artifactManager.generateJarDir(TestUtils.buildSessionJob());
+        var sessionJob = TestUtils.buildSessionJob();
+        String baseDir =
+                artifactManager.generateJarDir(sessionJob.getMetadata(), 
sessionJob.getSpec());
         String expected =
                 tempDir.toString()
                         + File.separator
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
index 9f4a315..a73e000 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
@@ -27,7 +27,7 @@ import 
org.apache.flink.kubernetes.operator.TestingStatusRecorder;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
-import 
org.apache.flink.kubernetes.operator.reconciler.sessionjob.FlinkSessionJobReconciler;
+import 
org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
 
@@ -47,7 +47,7 @@ public class SessionJobObserverTest {
     private final FlinkConfigManager configManager = new 
FlinkConfigManager(new Configuration());
     private final TestingFlinkService flinkService = new TestingFlinkService();
     private SessionJobObserver observer;
-    private FlinkSessionJobReconciler reconciler;
+    private SessionJobReconciler reconciler;
 
     @BeforeEach
     public void before() {
@@ -55,7 +55,9 @@ public class SessionJobObserverTest {
         TestingStatusRecorder<FlinkSessionJobStatus> statusRecorder = new 
TestingStatusRecorder<>();
         observer =
                 new SessionJobObserver(flinkService, configManager, 
statusRecorder, eventRecorder);
-        reconciler = new FlinkSessionJobReconciler(kubernetesClient, 
flinkService, configManager);
+        reconciler =
+                new SessionJobReconciler(
+                        kubernetesClient, flinkService, configManager, 
eventRecorder);
     }
 
     @Test
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index ca9d819..18c847c 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -27,7 +27,6 @@ import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptio
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
-import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
@@ -489,21 +488,18 @@ public class ApplicationReconcilerTest {
         ObjectMeta deployMeta = flinkApp.getMetadata();
         FlinkDeploymentStatus status = flinkApp.getStatus();
         FlinkDeploymentSpec spec = flinkApp.getSpec();
-        JobSpec jobSpec = spec.getJob();
         Configuration deployConfig = configManager.getDeployConfig(deployMeta, 
spec);
 
         
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
         status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
-        reconciler.deployFlinkJob(
-                deployMeta, jobSpec, status, deployConfig, Optional.empty(), 
false);
+        reconciler.deploy(deployMeta, spec, status, deployConfig, 
Optional.empty(), false);
 
         String path1 = deployConfig.get(JobResultStoreOptions.STORAGE_PATH);
         Assertions.assertTrue(path1.startsWith(haStoragePath));
 
         
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
         status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
-        reconciler.deployFlinkJob(
-                deployMeta, jobSpec, status, deployConfig, Optional.empty(), 
false);
+        reconciler.deploy(deployMeta, spec, status, deployConfig, 
Optional.empty(), false);
         String path2 = deployConfig.get(JobResultStoreOptions.STORAGE_PATH);
         Assertions.assertTrue(path2.startsWith(haStoragePath));
         assertNotEquals(path1, path2);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
similarity index 78%
rename from 
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconcilerTest.java
rename to 
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
index 8167292..6d1c414 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.kubernetes.operator.reconciler.sessionjob;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
@@ -29,14 +30,18 @@ import 
org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
+import org.apache.flink.runtime.client.JobStatusMessage;
 
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import javax.annotation.Nullable;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 
@@ -45,132 +50,132 @@ import static org.junit.Assert.assertNull;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-/** Tests for {@link FlinkSessionJobReconciler}. */
-public class FlinkSessionJobReconcilerTest {
+/** Tests for {@link SessionJobReconciler}. */
+public class SessionJobReconcilerTest {
 
     private final FlinkConfigManager configManager = new 
FlinkConfigManager(new Configuration());
+    private TestingFlinkService flinkService = new TestingFlinkService();
+    private SessionJobReconciler reconciler;
+
+    @BeforeEach
+    public void before() {
+        var eventRecorder = new EventRecorder(null, (r, e) -> {});
+        reconciler = new SessionJobReconciler(null, flinkService, 
configManager, eventRecorder);
+    }
 
     @Test
     public void testSubmitAndCleanUp() throws Exception {
-        TestingFlinkService flinkService = new TestingFlinkService();
         FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
 
-        FlinkSessionJobReconciler reconciler =
-                new FlinkSessionJobReconciler(null, flinkService, 
configManager);
         // session not found
         reconciler.reconcile(sessionJob, TestUtils.createEmptyContext());
-        assertEquals(0, flinkService.listSessionJobs().size());
+        assertEquals(0, flinkService.listJobs().size());
 
         // session not ready
         reconciler.reconcile(sessionJob, 
TestUtils.createContextWithNotReadyFlinkDeployment());
-        assertEquals(0, flinkService.listSessionJobs().size());
+        assertEquals(0, flinkService.listJobs().size());
 
         // session ready
         reconciler.reconcile(sessionJob, 
TestUtils.createContextWithReadyFlinkDeployment());
-        assertEquals(1, flinkService.listSessionJobs().size());
+        assertEquals(1, flinkService.listJobs().size());
         verifyAndSetRunningJobsToStatus(
                 sessionJob,
                 JobState.RUNNING,
                 org.apache.flink.api.common.JobStatus.RECONCILING.name(),
                 null,
-                flinkService.listSessionJobs());
+                flinkService.listJobs());
         // clean up
         reconciler.cleanup(sessionJob, 
TestUtils.createContextWithReadyFlinkDeployment());
-        assertEquals(0, flinkService.listSessionJobs().size());
+        assertEquals(
+                org.apache.flink.api.common.JobStatus.FINISHED,
+                flinkService.listJobs().get(0).f1.getJobState());
     }
 
     @Test
     public void testRestart() throws Exception {
-        TestingFlinkService flinkService = new TestingFlinkService();
         FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
 
-        FlinkSessionJobReconciler reconciler =
-                new FlinkSessionJobReconciler(null, flinkService, 
configManager);
         // session ready
         reconciler.reconcile(sessionJob, 
TestUtils.createContextWithReadyFlinkDeployment());
-        assertEquals(1, flinkService.listSessionJobs().size());
+        assertEquals(1, flinkService.listJobs().size());
         verifyAndSetRunningJobsToStatus(
                 sessionJob,
                 JobState.RUNNING,
                 org.apache.flink.api.common.JobStatus.RECONCILING.name(),
                 null,
-                flinkService.listSessionJobs());
+                flinkService.listJobs());
         sessionJob.getSpec().setRestartNonce(2L);
         reconciler.reconcile(sessionJob, 
TestUtils.createContextWithReadyFlinkDeployment());
-        assertEquals(0, flinkService.listSessionJobs().size());
+        assertEquals(
+                org.apache.flink.api.common.JobStatus.FINISHED,
+                flinkService.listJobs().get(0).f1.getJobState());
         reconciler.reconcile(sessionJob, 
TestUtils.createContextWithReadyFlinkDeployment());
         verifyAndSetRunningJobsToStatus(
                 sessionJob,
                 JobState.RUNNING,
                 org.apache.flink.api.common.JobStatus.RECONCILING.name(),
                 null,
-                flinkService.listSessionJobs());
+                flinkService.listJobs());
     }
 
     @Test
     public void testSubmitWithInitialSavepointPath() throws Exception {
-        TestingFlinkService flinkService = new TestingFlinkService();
         FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
 
         var initSavepointPath = "file:///init-sp";
         
sessionJob.getSpec().getJob().setInitialSavepointPath(initSavepointPath);
-        FlinkSessionJobReconciler reconciler =
-                new FlinkSessionJobReconciler(null, flinkService, 
configManager);
         reconciler.reconcile(sessionJob, 
TestUtils.createContextWithReadyFlinkDeployment());
         verifyAndSetRunningJobsToStatus(
                 sessionJob,
                 JobState.RUNNING,
                 org.apache.flink.api.common.JobStatus.RECONCILING.name(),
                 initSavepointPath,
-                flinkService.listSessionJobs());
+                flinkService.listJobs());
     }
 
     @Test
     public void testStatelessUpgrade() throws Exception {
-        TestingFlinkService flinkService = new TestingFlinkService();
         FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
 
         var readyContext = TestUtils.createContextWithReadyFlinkDeployment();
-        FlinkSessionJobReconciler reconciler =
-                new FlinkSessionJobReconciler(null, flinkService, 
configManager);
         reconciler.reconcile(sessionJob, readyContext);
-        assertEquals(1, flinkService.listSessionJobs().size());
+        assertEquals(1, flinkService.listJobs().size());
         verifyAndSetRunningJobsToStatus(
                 sessionJob,
                 JobState.RUNNING,
                 org.apache.flink.api.common.JobStatus.RECONCILING.name(),
                 null,
-                flinkService.listSessionJobs());
+                flinkService.listJobs());
 
         var statelessSessionJob = ReconciliationUtils.clone(sessionJob);
         
statelessSessionJob.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
         statelessSessionJob.getSpec().getJob().setParallelism(2);
         // job suspended first
         reconciler.reconcile(statelessSessionJob, readyContext);
-        assertTrue(flinkService.listSessionJobs().isEmpty());
-        verifyJobState(statelessSessionJob, JobState.SUSPENDED, 
JobState.SUSPENDED.name());
+        assertEquals(
+                org.apache.flink.api.common.JobStatus.FINISHED,
+                flinkService.listJobs().get(0).f1.getJobState());
+        verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
 
+        flinkService.clear();
         reconciler.reconcile(statelessSessionJob, readyContext);
-        assertEquals(1, flinkService.listSessionJobs().size());
+        assertEquals(1, flinkService.listJobs().size());
         verifyAndSetRunningJobsToStatus(
                 statelessSessionJob,
                 JobState.RUNNING,
                 org.apache.flink.api.common.JobStatus.RECONCILING.name(),
                 null,
-                flinkService.listSessionJobs());
+                flinkService.listJobs());
     }
 
     @Test
     public void testSavepointUpgrade() throws Exception {
-        TestingFlinkService flinkService = new TestingFlinkService();
         FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
-        FlinkSessionJobReconciler reconciler =
-                new FlinkSessionJobReconciler(null, flinkService, 
configManager);
 
         var readyContext = TestUtils.createContextWithReadyFlinkDeployment();
         reconciler.reconcile(sessionJob, readyContext);
         // start the job
-        assertEquals(1, flinkService.listSessionJobs().size());
+        assertEquals(1, flinkService.listJobs().size());
         assertTrue(
                 sessionJob
                         .getStatus()
@@ -183,11 +188,21 @@ public class FlinkSessionJobReconcilerTest {
         var statefulSessionJob = ReconciliationUtils.clone(sessionJob);
         
statefulSessionJob.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
         statefulSessionJob.getSpec().getJob().setParallelism(3);
+
+        verifyAndSetRunningJobsToStatus(
+                statefulSessionJob,
+                JobState.RUNNING,
+                org.apache.flink.api.common.JobStatus.RECONCILING.name(),
+                null,
+                flinkService.listJobs());
+
         reconciler.reconcile(statefulSessionJob, readyContext);
 
         // job suspended first
-        assertTrue(flinkService.listSessionJobs().isEmpty());
-        verifyJobState(statefulSessionJob, JobState.SUSPENDED, 
JobState.SUSPENDED.name());
+        assertEquals(
+                org.apache.flink.api.common.JobStatus.FINISHED,
+                flinkService.listJobs().get(0).f1.getJobState());
+        verifyJobState(statefulSessionJob, JobState.SUSPENDED, "FINISHED");
         assertEquals(
                 1,
                 statefulSessionJob
@@ -205,52 +220,31 @@ public class FlinkSessionJobReconcilerTest {
                         .getLastSavepoint()
                         .getTriggerType());
 
+        flinkService.clear();
         // upgraded
         reconciler.reconcile(statefulSessionJob, readyContext);
-        assertEquals(1, flinkService.listSessionJobs().size());
+        assertEquals(1, flinkService.listJobs().size());
         verifyAndSetRunningJobsToStatus(
                 statefulSessionJob,
                 JobState.RUNNING,
                 org.apache.flink.api.common.JobStatus.RECONCILING.name(),
                 "savepoint_0",
-                flinkService.listSessionJobs());
-    }
-
-    @Test
-    public void testUseTheEffectiveConfigToSubmit() throws Exception {
-        TestingFlinkService flinkService = new TestingFlinkService();
-        FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
-
-        var readyContext =
-                TestUtils.createContextWithReadyFlinkDeployment(Map.of("key", 
"newValue"));
-
-        FlinkSessionJobReconciler reconciler =
-                new FlinkSessionJobReconciler(null, flinkService, 
configManager);
-
-        reconciler.reconcile(sessionJob, readyContext);
-
-        assertEquals(1, flinkService.listSessionJobs().size());
-        var submittedJob =
-                verifyAndReturnTheSubmittedJob(sessionJob, 
flinkService.listSessionJobs());
-        assertEquals("newValue", submittedJob.effectiveConfig.getString("key", 
null));
+                flinkService.listJobs());
     }
 
     @Test
     public void testTriggerSavepoint() throws Exception {
-        TestingFlinkService flinkService = new TestingFlinkService();
         FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
         
assertFalse(SavepointUtils.savepointInProgress(sessionJob.getStatus().getJobStatus()));
 
         var readyContext = TestUtils.createContextWithReadyFlinkDeployment();
-        FlinkSessionJobReconciler reconciler =
-                new FlinkSessionJobReconciler(null, flinkService, 
configManager);
         reconciler.reconcile(sessionJob, readyContext);
         verifyAndSetRunningJobsToStatus(
                 sessionJob,
                 JobState.RUNNING,
                 org.apache.flink.api.common.JobStatus.RECONCILING.name(),
                 null,
-                flinkService.listSessionJobs());
+                flinkService.listJobs());
 
         
assertFalse(SavepointUtils.savepointInProgress(sessionJob.getStatus().getJobStatus()));
 
@@ -344,7 +338,7 @@ public class FlinkSessionJobReconcilerTest {
                 JobState.RUNNING,
                 org.apache.flink.api.common.JobStatus.RECONCILING.name(),
                 null,
-                flinkService.listSessionJobs());
+                flinkService.listJobs());
 
         
sp1SessionJob.getStatus().getJobStatus().getSavepointInfo().resetTrigger();
         ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(
@@ -367,11 +361,11 @@ public class FlinkSessionJobReconcilerTest {
         
assertFalse(SavepointUtils.savepointInProgress(sp1SessionJob.getStatus().getJobStatus()));
     }
 
-    private TestingFlinkService.SubmittedJobInfo 
verifyAndReturnTheSubmittedJob(
-            FlinkSessionJob sessionJob,
-            Map<JobID, TestingFlinkService.SubmittedJobInfo> sessionJobs) {
+    private Tuple2<String, JobStatusMessage> verifyAndReturnTheSubmittedJob(
+            FlinkSessionJob sessionJob, List<Tuple2<String, JobStatusMessage>> 
jobs) {
         var jobID = 
JobID.fromHexString(sessionJob.getStatus().getJobStatus().getJobId());
-        var submittedJobInfo = sessionJobs.get(jobID);
+        var submittedJobInfo =
+                jobs.stream().filter(t -> 
t.f1.getJobId().equals(jobID)).findAny().get();
         Assertions.assertNotNull(submittedJobInfo);
         return submittedJobInfo;
     }
@@ -381,14 +375,14 @@ public class FlinkSessionJobReconcilerTest {
             JobState expectedState,
             String jobStatusObserved,
             @Nullable String expectedSavepointPath,
-            Map<JobID, TestingFlinkService.SubmittedJobInfo> sessionJobs) {
+            List<Tuple2<String, JobStatusMessage>> jobs) {
 
-        var submittedJobInfo = verifyAndReturnTheSubmittedJob(sessionJob, 
sessionJobs);
-        assertEquals(expectedSavepointPath, submittedJobInfo.savepointPath);
+        var submittedJobInfo = verifyAndReturnTheSubmittedJob(sessionJob, 
jobs);
+        assertEquals(expectedSavepointPath, submittedJobInfo.f0);
 
         verifyJobState(sessionJob, expectedState, jobStatusObserved);
         JobStatus jobStatus = sessionJob.getStatus().getJobStatus();
-        jobStatus.setJobName(submittedJobInfo.jobStatusMessage.getJobName());
+        jobStatus.setJobName(submittedJobInfo.f1.getJobName());
         jobStatus.setState("RUNNING");
     }
 
@@ -409,9 +403,6 @@ public class FlinkSessionJobReconcilerTest {
     @Test
     public void testJobUpgradeIgnorePendingSavepoint() throws Exception {
         Context readyContext = 
TestUtils.createContextWithReadyFlinkDeployment();
-        TestingFlinkService flinkService = new TestingFlinkService();
-        FlinkSessionJobReconciler reconciler =
-                new FlinkSessionJobReconciler(null, flinkService, 
configManager);
         FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
         reconciler.reconcile(sessionJob, readyContext);
         verifyAndSetRunningJobsToStatus(
@@ -419,7 +410,7 @@ public class FlinkSessionJobReconcilerTest {
                 JobState.RUNNING,
                 org.apache.flink.api.common.JobStatus.RECONCILING.name(),
                 null,
-                flinkService.listSessionJobs());
+                flinkService.listJobs());
 
         FlinkSessionJob spSessionJob = ReconciliationUtils.clone(sessionJob);
         spSessionJob
@@ -439,12 +430,14 @@ public class FlinkSessionJobReconcilerTest {
                                         .key(),
                                 "true")));
         // Force upgrade when savepoint is in progress.
-        reconciler = new FlinkSessionJobReconciler(null, flinkService, 
configManager);
+        reconciler =
+                new SessionJobReconciler(
+                        null, flinkService, configManager, new 
EventRecorder(null, null));
         spSessionJob.getSpec().getJob().setParallelism(100);
         reconciler.reconcile(spSessionJob, readyContext);
         assertEquals(
                 "trigger_0",
                 
spSessionJob.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
-        assertEquals(JobState.SUSPENDED.name(), 
spSessionJob.getStatus().getJobStatus().getState());
+        assertEquals("FINISHED", 
spSessionJob.getStatus().getJobStatus().getState());
     }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
index f5afb2b..4b515aa 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
@@ -68,6 +68,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 public class FlinkServiceTest {
     KubernetesClient client;
     private final Configuration configuration = new Configuration();
+    private final FlinkConfigManager configManager = new 
FlinkConfigManager(configuration);
 
     @BeforeEach
     public void setup() {
@@ -98,7 +99,8 @@ public class FlinkServiceTest {
 
         
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
         deployment.getStatus().getJobStatus().setState("RUNNING");
-        flinkService.cancelJob(deployment, UpgradeMode.STATELESS);
+        flinkService.cancelJob(
+                deployment, UpgradeMode.STATELESS, 
configManager.getObserveConfig(deployment));
         assertTrue(cancelFuture.isDone());
         assertEquals(jobID, cancelFuture.get());
         assertNull(jobStatus.getSavepointInfo().getLastSavepoint());
@@ -134,7 +136,8 @@ public class FlinkServiceTest {
         ReconciliationUtils.updateForSpecReconciliationSuccess(
                 deployment, JobState.RUNNING, new Configuration());
 
-        flinkService.cancelJob(deployment, UpgradeMode.SAVEPOINT);
+        flinkService.cancelJob(
+                deployment, UpgradeMode.SAVEPOINT, 
configManager.getObserveConfig(deployment));
         assertTrue(stopWithSavepointFuture.isDone());
         assertEquals(jobID, stopWithSavepointFuture.get().f0);
         assertFalse(stopWithSavepointFuture.get().f1);
@@ -166,7 +169,8 @@ public class FlinkServiceTest {
         JobStatus jobStatus = deployment.getStatus().getJobStatus();
         jobStatus.setJobId(jobID.toHexString());
 
-        flinkService.cancelJob(deployment, UpgradeMode.LAST_STATE);
+        flinkService.cancelJob(
+                deployment, UpgradeMode.LAST_STATE, 
configManager.getObserveConfig(deployment));
         assertNull(jobStatus.getSavepointInfo().getLastSavepoint());
         assertNull(
                 client.apps()

Reply via email to