This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new e4a6faa  [FLINK-26787] Initial implementation of 
FlinkSessionJobController and reconciler
e4a6faa is described below

commit e4a6faae6102779e895113cd1584c9f4fef8cad4
Author: Aitozi <[email protected]>
AuthorDate: Sun Apr 3 22:28:17 2022 +0800

    [FLINK-26787] Initial implementation of FlinkSessionJobController and 
reconciler
---
 docs/content/docs/custom-resource/reference.md     |  17 +-
 docs/content/docs/development/guide.md             |   8 +
 examples/basic-session-job.yaml                    |   2 +-
 flink-kubernetes-operator/pom.xml                  |   6 +
 .../flink/kubernetes/operator/FlinkOperator.java   |  82 ++++---
 .../JarResolver.java}                              |  26 ++-
 .../config/FlinkOperatorConfiguration.java         |   7 +-
 .../operator/config/OperatorConfigOptions.java     |   6 +
 .../operator/controller/FlinkControllerConfig.java |   9 +-
 .../controller/FlinkDeploymentController.java      |  24 +-
 .../controller/FlinkSessionJobController.java      | 245 +++++++++++++++++++++
 .../operator/crd/status/FlinkDeploymentStatus.java |   1 -
 .../FlinkSessionJobReconciliationStatus.java       |   2 +-
 .../status}/JobManagerDeploymentStatus.java        |   2 +-
 .../kubernetes/operator/observer/Observer.java     |  10 +-
 .../operator/observer/SavepointFetchResult.java    |   6 +-
 .../AbstractDeploymentObserver.java}               |  21 +-
 .../ApplicationObserver.java}                      |  20 +-
 .../observer/{ => deployment}/ObserverFactory.java |  23 +-
 .../observer/{ => deployment}/SessionObserver.java |  17 +-
 .../SessionJobObserver.java}                       |  19 +-
 .../kubernetes/operator/reconciler/Reconciler.java |  29 +--
 .../operator/reconciler/ReconciliationUtils.java   |  40 +++-
 .../AbstractDeploymentReconciler.java}             |  13 +-
 .../ApplicationReconciler.java}                    |  13 +-
 .../{ => deployment}/ReconcilerFactory.java        |  22 +-
 .../{ => deployment}/SessionReconciler.java        |   7 +-
 .../sessionjob/FlinkSessionJobReconciler.java      | 126 +++++++++++
 .../kubernetes/operator/service/FlinkService.java  | 111 +++++++++-
 .../kubernetes/operator/utils/OperatorUtils.java   |  16 ++
 ...loymentValidator.java => DefaultValidator.java} |  47 +++-
 ...tValidator.java => FlinkResourceValidator.java} |  17 +-
 .../flink/kubernetes/operator/TestUtils.java       |  62 ++++++
 .../kubernetes/operator/TestingFlinkService.java   |  20 ++
 .../operator/artifact/JarResolverTest.java}        |  25 ++-
 .../controller/FlinkDeploymentControllerTest.java  |  15 +-
 ...erverTest.java => ApplicationObserverTest.java} |  22 +-
 .../operator/observer/SessionObserverTest.java     |   2 +
 .../ApplicationReconcilerTest.java}                |  28 ++-
 .../{ => deployment}/SessionReconcilerTest.java    |   6 +-
 .../sessionjob/FlinkSessionJobReconcilerTest.java  |  61 +++++
 .../operator/utils/ReconciliationUtilsTest.java    |   2 +-
 ...alidatorTest.java => DefaultValidatorTest.java} |  79 ++++++-
 .../operator/admission/FlinkOperatorWebhook.java   |   4 +-
 .../operator/admission/FlinkValidator.java         |   8 +-
 .../operator/admission/AdmissionHandlerTest.java   |   4 +-
 .../conf/flink-operator-config/log4j2.properties   |   2 +-
 .../crds/flinksessionjobs.flink.apache.org-v1.yml  |   2 +-
 .../templates/flink-operator.yaml                  |   6 +
 helm/flink-kubernetes-operator/templates/rbac.yaml |   2 +
 helm/flink-kubernetes-operator/values.yaml         |  17 ++
 51 files changed, 1121 insertions(+), 240 deletions(-)

diff --git a/docs/content/docs/custom-resource/reference.md 
b/docs/content/docs/custom-resource/reference.md
index 9880eec..3a7cf29 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -168,7 +168,7 @@ This page serves as a full reference for FlinkDeployment 
custom resource definit
 | Parameter | Type | Docs |
 | ----------| ---- | ---- |
 | jobStatus | org.apache.flink.kubernetes.operator.crd.status.JobStatus | Last 
observed status of the Flink job on Application deployments. |
-| jobManagerDeploymentStatus | 
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus | Last 
observed status of the JobManager deployment. |
+| jobManagerDeploymentStatus | 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus | 
Last observed status of the JobManager deployment. |
 | reconciliationStatus | 
org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus | Status 
of the last reconcile operation. |
 
 ### FlinkSessionJobReconciliationStatus
@@ -180,7 +180,7 @@ This page serves as a full reference for FlinkDeployment 
custom resource definit
 | ----------| ---- | ---- |
 | success | boolean | True if last reconciliation step was successful. |
 | error | java.lang.String | If success == false, error information about the 
reconciliation failure. |
-| flinkSessionJobSpec | 
org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec | Last 
reconciled job spec. Used to decide whether further reconciliation steps are 
necessary. |
+| lastReconciledSpec | 
org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec | Last 
reconciled job spec. Used to decide whether further reconciliation steps are 
necessary. |
 
 ### FlinkSessionJobStatus
 **Class**: 
org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus
@@ -192,6 +192,19 @@ This page serves as a full reference for FlinkDeployment 
custom resource definit
 | jobStatus | org.apache.flink.kubernetes.operator.crd.status.JobStatus | Last 
observed status of the job. |
 | reconciliationStatus | 
org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobReconciliationStatus
 | Status of the last reconcile operation. |
 
+### JobManagerDeploymentStatus
+**Class**: 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus
+
+**Description**: Status of the Flink JobManager Kubernetes deployment.
+
+| Value | Docs |
+| ----- | ---- |
+| READY | JobManager is running and ready to receive REST API calls. |
+| DEPLOYED_NOT_READY | JobManager is running but not ready yet to receive REST 
API calls. |
+| DEPLOYING | JobManager process is starting up. |
+| MISSING | JobManager deployment not found, probably not started or killed by 
user. |
+| ERROR | Deployment in terminal error, requires spec change for 
reconciliation to continue. |
+
 ### JobStatus
 **Class**: org.apache.flink.kubernetes.operator.crd.status.JobStatus
 
diff --git a/docs/content/docs/development/guide.md 
b/docs/content/docs/development/guide.md
index 643aa29..d819425 100644
--- a/docs/content/docs/development/guide.md
+++ b/docs/content/docs/development/guide.md
@@ -108,6 +108,14 @@ So if the CRD is changed, you have to delete the CRD 
resource manually, and re-i
 kubectl delete crd flinkdeployments.flink.apache.org
 ```
 
+### Mounts
+
+The operator supports to specify the volume mounts. The default mounts to 
hostPath can be activated by the following command. You can change the default 
mounts in the `helm/flink-operator/values.yaml`
+
+```bash
+helm install flink-operator helm/flink-operator --set 
operatorVolumeMounts.create=true --set operatorVolumes.create=true 
+```
+
 
 ## CI/CD
 We use [GitHub 
Actions](https://help.github.com/en/actions/getting-started-with-github-actions/about-github-actions)
 to help you automate your software development workflows in the same place you 
store code and collaborate on pull requests and issues.
diff --git a/examples/basic-session-job.yaml b/examples/basic-session-job.yaml
index e2b2401..cfd5e4f 100644
--- a/examples/basic-session-job.yaml
+++ b/examples/basic-session-job.yaml
@@ -24,6 +24,6 @@ metadata:
 spec:
   clusterId: basic-session-example
   job:
-    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
+    jarURI: file:///opt/flink/artifacts/TopSpeedWindowing.jar
     parallelism: 2
     upgradeMode: stateless
diff --git a/flink-kubernetes-operator/pom.xml 
b/flink-kubernetes-operator/pom.xml
index cde7633..1ca0e66 100644
--- a/flink-kubernetes-operator/pom.xml
+++ b/flink-kubernetes-operator/pom.xml
@@ -71,6 +71,12 @@ under the License.
 
         <dependency>
             <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime-web_${scala.version}</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
             <artifactId>flink-kubernetes-shaded</artifactId>
             <version>${project.version}</version>
             <scope>provided</scope>
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 02757a6..6036f9a 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -22,13 +22,20 @@ import 
org.apache.flink.kubernetes.operator.config.DefaultConfig;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.controller.FlinkControllerConfig;
 import 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
+import 
org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
-import org.apache.flink.kubernetes.operator.observer.ObserverFactory;
-import org.apache.flink.kubernetes.operator.reconciler.ReconcilerFactory;
+import org.apache.flink.kubernetes.operator.observer.Observer;
+import 
org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
+import 
org.apache.flink.kubernetes.operator.observer.sessionjob.SessionJobObserver;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import 
org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
+import 
org.apache.flink.kubernetes.operator.reconciler.sessionjob.FlinkSessionJobReconciler;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
-import 
org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator;
-import 
org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator;
+import org.apache.flink.kubernetes.operator.validation.DefaultValidator;
+import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
 
 import io.fabric8.kubernetes.client.DefaultKubernetesClient;
 import io.fabric8.kubernetes.client.KubernetesClient;
@@ -46,54 +53,76 @@ public class FlinkOperator {
     private static final Logger LOG = 
LoggerFactory.getLogger(FlinkOperator.class);
 
     private final Operator operator;
-    private final FlinkDeploymentController controller;
-    private final FlinkControllerConfig controllerConfig;
+
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final KubernetesClient client;
+    private final FlinkService flinkService;
+    private final ConfigurationService configurationService;
+    private final DefaultConfig defaultConfig;
 
     public FlinkOperator() {
         this(FlinkUtils.loadDefaultConfig());
     }
 
     public FlinkOperator(DefaultConfig defaultConfig) {
-
         LOG.info("Starting Flink Kubernetes Operator");
         
OperatorMetricUtils.initOperatorMetrics(defaultConfig.getOperatorConfig());
 
-        KubernetesClient client = new DefaultKubernetesClient();
-        String namespace = client.getNamespace();
-        if (namespace == null) {
-            namespace = "default";
-        }
-
-        FlinkOperatorConfiguration operatorConfiguration =
+        this.defaultConfig = defaultConfig;
+        this.client = new DefaultKubernetesClient();
+        this.operatorConfiguration =
                 
FlinkOperatorConfiguration.fromConfiguration(defaultConfig.getOperatorConfig());
+        this.configurationService = 
getConfigurationService(operatorConfiguration);
+        this.operator = new Operator(client, configurationService);
+        this.flinkService = new FlinkService(client, operatorConfiguration);
+    }
 
-        ConfigurationService configurationService = 
getConfigurationService(operatorConfiguration);
-
-        operator = new Operator(client, configurationService);
-
-        FlinkService flinkService = new FlinkService(client, 
operatorConfiguration);
-
-        FlinkDeploymentValidator validator = new DefaultDeploymentValidator();
+    private void registerDeploymentController() {
+        FlinkResourceValidator validator = new DefaultValidator();
         ReconcilerFactory reconcilerFactory =
                 new ReconcilerFactory(client, flinkService, 
operatorConfiguration);
         ObserverFactory observerFactory =
                 new ObserverFactory(
                         flinkService, operatorConfiguration, 
defaultConfig.getFlinkConfig());
 
-        controller =
+        FlinkDeploymentController controller =
                 new FlinkDeploymentController(
                         defaultConfig,
                         operatorConfiguration,
                         client,
-                        namespace,
                         validator,
                         reconcilerFactory,
                         observerFactory);
 
-        controllerConfig =
-                new FlinkControllerConfig(controller, 
operatorConfiguration.getWatchedNamespaces());
+        FlinkControllerConfig<FlinkDeployment> controllerConfig =
+                new FlinkControllerConfig<>(
+                        controller, 
operatorConfiguration.getWatchedNamespaces());
         controller.setControllerConfig(controllerConfig);
         controllerConfig.setConfigurationService(configurationService);
+        operator.register(controller, controllerConfig);
+    }
+
+    private void registerSessionJobController() {
+        FlinkResourceValidator validator = new DefaultValidator();
+        Reconciler<FlinkSessionJob> reconciler =
+                new FlinkSessionJobReconciler(client, flinkService, 
operatorConfiguration);
+        Observer<FlinkSessionJob> observer = new SessionJobObserver();
+        FlinkSessionJobController controller =
+                new FlinkSessionJobController(
+                        defaultConfig,
+                        operatorConfiguration,
+                        client,
+                        validator,
+                        reconciler,
+                        observer);
+
+        FlinkControllerConfig<FlinkSessionJob> controllerConfig =
+                new FlinkControllerConfig<>(
+                        controller, 
operatorConfiguration.getWatchedNamespaces());
+
+        controllerConfig.setConfigurationService(configurationService);
+        controller.init(controllerConfig);
+        operator.register(controller, controllerConfig);
     }
 
     private ConfigurationService getConfigurationService(
@@ -115,7 +144,8 @@ public class FlinkOperator {
     }
 
     public void run() {
-        operator.register(controller, controllerConfig);
+        registerDeploymentController();
+        registerSessionJobController();
         operator.installShutdownHook();
         operator.start();
     }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkDeploymentValidator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/JarResolver.java
similarity index 61%
copy from 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkDeploymentValidator.java
copy to 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/JarResolver.java
index 732224d..bf60bf6 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkDeploymentValidator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/JarResolver.java
@@ -15,20 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.validation;
+package org.apache.flink.kubernetes.operator.artifact;
 
-import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.util.FlinkRuntimeException;
 
-import java.util.Optional;
+import java.net.URI;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 
-/** Validator for {@link FlinkDeployment} resources. */
-public interface FlinkDeploymentValidator {
+/** Resolve the jar uri. */
+public class JarResolver {
 
-    /**
-     * Validate and return optional error.
-     *
-     * @param deployment
-     * @return Optional error string, should be present iff validation 
resulted in an error
-     */
-    Optional<String> validate(FlinkDeployment deployment);
+    public Path resolve(String jarURI) throws Exception {
+        URI uri = new URI(jarURI);
+        if (!"file".equals(uri.getScheme())) {
+            throw new FlinkRuntimeException("Only support local jar now.");
+        }
+        return Paths.get(uri.getPath());
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index 9b27745..4cea133 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -39,6 +39,7 @@ public class FlinkOperatorConfiguration {
     Duration flinkClientTimeout;
     String flinkServiceHostOverride;
     Set<String> watchedNamespaces;
+    Duration cancelJobTimeout;
 
     public static FlinkOperatorConfiguration fromConfiguration(Configuration 
operatorConfig) {
         Set<String> watchedNamespaces = OperatorUtils.getWatchedNamespaces();
@@ -67,6 +68,9 @@ public class FlinkOperatorConfiguration {
         Duration flinkClientTimeout =
                 
operatorConfig.get(OperatorConfigOptions.OPERATOR_OBSERVER_FLINK_CLIENT_TIMEOUT);
 
+        Duration cancelJobTimeout =
+                
operatorConfig.get(OperatorConfigOptions.OPERATOR_CANCEL_JOB_TIMEOUT);
+
         String flinkServiceHostOverride = null;
         if (EnvUtils.get("KUBERNETES_SERVICE_HOST") == null) {
             // not running in k8s, simplify local development
@@ -81,6 +85,7 @@ public class FlinkOperatorConfiguration {
                 savepointTriggerGracePeriod,
                 flinkClientTimeout,
                 flinkServiceHostOverride,
-                watchedNamespaces);
+                watchedNamespaces,
+                cancelJobTimeout);
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
index 6d7642a..40058bd 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
@@ -69,4 +69,10 @@ public class OperatorConfigOptions {
                     .defaultValue(Duration.ofSeconds(10))
                     .withDescription(
                             "The timeout for the observer to wait the flink 
rest client to return.");
+
+    public static final ConfigOption<Duration> OPERATOR_CANCEL_JOB_TIMEOUT =
+            ConfigOptions.key("operator.reconciler.flink.cancel.job.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(1))
+                    .withDescription("The timeout for the operator to cancel 
job.");
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkControllerConfig.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkControllerConfig.java
index 49b2b27..2a55456 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkControllerConfig.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkControllerConfig.java
@@ -17,19 +17,18 @@
 
 package org.apache.flink.kubernetes.operator.controller;
 
-import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
 import io.javaoperatorsdk.operator.config.runtime.AnnotationConfiguration;
 
 import java.util.Set;
 
 /** Custom config for {@link FlinkDeploymentController}. */
-public class FlinkControllerConfig extends 
AnnotationConfiguration<FlinkDeployment> {
+public class FlinkControllerConfig<CR extends HasMetadata> extends 
AnnotationConfiguration<CR> {
 
     private final Set<String> watchedNamespaces;
 
-    public FlinkControllerConfig(
-            FlinkDeploymentController reconciler, Set<String> 
watchedNamespaces) {
+    public FlinkControllerConfig(Reconciler<CR> reconciler, Set<String> 
watchedNamespaces) {
         super(reconciler);
         this.watchedNamespaces = watchedNamespaces;
     }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 0067674..8ebeb50 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -21,15 +21,15 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.config.DefaultConfig;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
-import 
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
-import org.apache.flink.kubernetes.operator.observer.ObserverFactory;
-import org.apache.flink.kubernetes.operator.reconciler.ReconcilerFactory;
+import 
org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import 
org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
-import 
org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator;
+import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
 import org.apache.flink.util.Preconditions;
 
 import io.fabric8.kubernetes.api.model.Event;
@@ -62,28 +62,24 @@ public class FlinkDeploymentController
 
     private final KubernetesClient kubernetesClient;
 
-    private final String operatorNamespace;
-
-    private final FlinkDeploymentValidator validator;
+    private final FlinkResourceValidator validator;
     private final ReconcilerFactory reconcilerFactory;
     private final ObserverFactory observerFactory;
     private final DefaultConfig defaultConfig;
     private final FlinkOperatorConfiguration operatorConfiguration;
 
-    private FlinkControllerConfig controllerConfig;
+    private FlinkControllerConfig<FlinkDeployment> controllerConfig;
 
     public FlinkDeploymentController(
             DefaultConfig defaultConfig,
             FlinkOperatorConfiguration operatorConfiguration,
             KubernetesClient kubernetesClient,
-            String operatorNamespace,
-            FlinkDeploymentValidator validator,
+            FlinkResourceValidator validator,
             ReconcilerFactory reconcilerFactory,
             ObserverFactory observerFactory) {
         this.defaultConfig = defaultConfig;
         this.operatorConfiguration = operatorConfiguration;
         this.kubernetesClient = kubernetesClient;
-        this.operatorNamespace = operatorNamespace;
         this.validator = validator;
         this.reconcilerFactory = reconcilerFactory;
         this.observerFactory = observerFactory;
@@ -99,7 +95,7 @@ public class FlinkDeploymentController
         }
         Configuration effectiveConfig =
                 FlinkUtils.getEffectiveConfig(flinkApp, 
defaultConfig.getFlinkConfig());
-        return reconcilerFactory.getOrCreate(flinkApp).cleanup(flinkApp, 
effectiveConfig);
+        return reconcilerFactory.getOrCreate(flinkApp).cleanup(flinkApp, 
context, effectiveConfig);
     }
 
     @Override
@@ -108,7 +104,7 @@ public class FlinkDeploymentController
         FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
         try {
             observerFactory.getOrCreate(flinkApp).observe(flinkApp, context);
-            Optional<String> validationError = validator.validate(flinkApp);
+            Optional<String> validationError = 
validator.validateDeployment(flinkApp);
             if (validationError.isPresent()) {
                 LOG.error("Validation failed: " + validationError.get());
                 ReconciliationUtils.updateForReconciliationError(flinkApp, 
validationError.get());
@@ -170,7 +166,7 @@ public class FlinkDeploymentController
         return Optional.of(flinkApp);
     }
 
-    public void setControllerConfig(FlinkControllerConfig config) {
+    public void setControllerConfig(FlinkControllerConfig<FlinkDeployment> 
config) {
         this.controllerConfig = config;
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
new file mode 100644
index 0000000..1e563fc
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.kubernetes.operator.config.DefaultConfig;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.observer.Observer;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
+import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
+import org.apache.flink.util.Preconditions;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
+import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import io.javaoperatorsdk.operator.processing.event.source.EventSource;
+import 
io.javaoperatorsdk.operator.processing.event.source.PrimaryResourcesRetriever;
+import 
io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** Controller that runs the main reconcile loop for {@link FlinkSessionJob}. 
*/
+@ControllerConfiguration
+public class FlinkSessionJobController
+        implements 
io.javaoperatorsdk.operator.api.reconciler.Reconciler<FlinkSessionJob>,
+                ErrorStatusHandler<FlinkSessionJob>,
+                EventSourceInitializer<FlinkSessionJob> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlinkSessionJobController.class);
+    private static final String CLUSTER_ID_INDEX = "clusterId_index";
+    private static final String ALL_NAMESPACE = "allNamespace";
+
+    private final KubernetesClient kubernetesClient;
+
+    private final FlinkResourceValidator validator;
+    private final Reconciler<FlinkSessionJob> reconciler;
+    private final Observer<FlinkSessionJob> observer;
+    private final DefaultConfig defaultConfig;
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private Map<String, SharedIndexInformer<FlinkSessionJob>> informers;
+    private FlinkControllerConfig<FlinkSessionJob> controllerConfig;
+
+    public FlinkSessionJobController(
+            DefaultConfig defaultConfig,
+            FlinkOperatorConfiguration operatorConfiguration,
+            KubernetesClient kubernetesClient,
+            FlinkResourceValidator validator,
+            Reconciler<FlinkSessionJob> reconciler,
+            Observer<FlinkSessionJob> observer) {
+        this.defaultConfig = defaultConfig;
+        this.operatorConfiguration = operatorConfiguration;
+        this.kubernetesClient = kubernetesClient;
+        this.validator = validator;
+        this.reconciler = reconciler;
+        this.observer = observer;
+    }
+
+    public void init(FlinkControllerConfig<FlinkSessionJob> config) {
+        this.controllerConfig = config;
+        this.informers = createInformers();
+    }
+
+    @Override
+    public UpdateControl<FlinkSessionJob> reconcile(
+            FlinkSessionJob flinkSessionJob, Context context) {
+        LOG.info("Starting reconciliation");
+        FlinkSessionJob originalCopy = 
ReconciliationUtils.clone(flinkSessionJob);
+        observer.observe(flinkSessionJob, context);
+        Optional<String> validationError =
+                validator.validateSessionJob(
+                        flinkSessionJob,
+                        OperatorUtils.getSecondaryResource(
+                                flinkSessionJob, context, 
operatorConfiguration));
+        if (validationError.isPresent()) {
+            LOG.error("Validation failed: " + validationError.get());
+            ReconciliationUtils.updateForReconciliationError(
+                    flinkSessionJob, validationError.get());
+            return ReconciliationUtils.toUpdateControl(originalCopy, 
flinkSessionJob);
+        }
+
+        try {
+            // TODO refactor the reconciler interface to return UpdateControl 
directly
+            reconciler.reconcile(flinkSessionJob, context, 
defaultConfig.getFlinkConfig());
+        } catch (Exception e) {
+            throw new ReconciliationException(e);
+        }
+
+        return ReconciliationUtils.toUpdateControl(originalCopy, 
flinkSessionJob)
+                
.rescheduleAfter(operatorConfiguration.getReconcileInterval().toMillis());
+    }
+
+    @Override
+    public DeleteControl cleanup(FlinkSessionJob sessionJob, Context context) {
+        LOG.info("Deleting FlinkSessionJob");
+
+        return reconciler.cleanup(sessionJob, context, 
defaultConfig.getFlinkConfig());
+    }
+
+    @Override
+    public Optional<FlinkSessionJob> updateErrorStatus(
+            FlinkSessionJob flinkSessionJob, RetryInfo retryInfo, 
RuntimeException e) {
+        LOG.warn(
+                "Attempt count: {}, last attempt: {}",
+                retryInfo.getAttemptCount(),
+                retryInfo.isLastAttempt());
+
+        ReconciliationUtils.updateForReconciliationError(
+                flinkSessionJob,
+                (e instanceof ReconciliationException) ? 
e.getCause().toString() : e.toString());
+        return Optional.of(flinkSessionJob);
+    }
+
+    @Override
+    public List<EventSource> prepareEventSources(
+            EventSourceContext<FlinkSessionJob> eventSourceContext) {
+        Preconditions.checkNotNull(controllerConfig, "Controller config cannot 
be null");
+        Set<String> effectiveNamespaces = 
controllerConfig.getEffectiveNamespaces();
+        if (effectiveNamespaces.isEmpty()) {
+            return List.of(createFlinkDepInformerEventSource(ALL_NAMESPACE));
+        } else {
+            return effectiveNamespaces.stream()
+                    .map(this::createFlinkDepInformerEventSource)
+                    .collect(Collectors.toList());
+        }
+    }
+
+    private InformerEventSource<FlinkDeployment, FlinkSessionJob> 
createFlinkDepInformerEventSource(
+            String name) {
+        return new InformerEventSource<>(
+                
kubernetesClient.resources(FlinkDeployment.class).runnableInformer(0),
+                primaryResourceRetriever(),
+                sessionJob ->
+                        new ResourceID(
+                                sessionJob.getSpec().getClusterId(),
+                                sessionJob.getMetadata().getNamespace()),
+                false) {
+            @Override
+            public String name() {
+                return name;
+            }
+        };
+    }
+
+    /**
+     * Mapping the {@link FlinkDeployment} session cluster to {@link 
FlinkSessionJob}. It leverages
+     * the informer indexer.
+     *
+     * @return The {@link PrimaryResourcesRetriever}.
+     */
+    private PrimaryResourcesRetriever<FlinkDeployment> 
primaryResourceRetriever() {
+        return flinkDeployment -> {
+            var namespace = flinkDeployment.getMetadata().getNamespace();
+            var informer =
+                    controllerConfig.getEffectiveNamespaces().isEmpty()
+                            ? informers.get(ALL_NAMESPACE)
+                            : informers.get(namespace);
+
+            var sessionJobs =
+                    informer.getIndexer()
+                            .byIndex(CLUSTER_ID_INDEX, 
flinkDeployment.getMetadata().getName());
+            var resourceIDs = new HashSet<ResourceID>();
+            for (FlinkSessionJob sessionJob : sessionJobs) {
+                resourceIDs.add(
+                        new ResourceID(
+                                sessionJob.getMetadata().getName(),
+                                sessionJob.getMetadata().getNamespace()));
+            }
+            LOG.debug(
+                    "Find the target resource {} for {} ",
+                    resourceIDs,
+                    flinkDeployment.getMetadata().getNamespace());
+            return resourceIDs;
+        };
+    }
+
+    /**
+     * Create informers for session job to build indexer for cluster to 
session job relations.
+     *
+     * @return The different namespace's index informer.
+     */
+    private Map<String, SharedIndexInformer<FlinkSessionJob>> 
createInformers() {
+        Set<String> effectiveNamespaces = 
controllerConfig.getEffectiveNamespaces();
+        if (effectiveNamespaces.isEmpty()) {
+            return Map.of(
+                    ALL_NAMESPACE,
+                    kubernetesClient
+                            .resources(FlinkSessionJob.class)
+                            .inAnyNamespace()
+                            .withIndexers(clusterToSessionJobIndexer())
+                            .inform());
+        } else {
+            var informers = new HashMap<String, 
SharedIndexInformer<FlinkSessionJob>>();
+            for (String effectiveNamespace : effectiveNamespaces) {
+                informers.put(
+                        effectiveNamespace,
+                        kubernetesClient
+                                .resources(FlinkSessionJob.class)
+                                .inNamespace(effectiveNamespace)
+                                .withIndexers(clusterToSessionJobIndexer())
+                                .inform());
+            }
+            return informers;
+        }
+    }
+
+    private Map<String, Function<FlinkSessionJob, List<String>>> 
clusterToSessionJobIndexer() {
+        return Map.of(CLUSTER_ID_INDEX, sessionJob -> 
List.of(sessionJob.getSpec().getClusterId()));
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
index a8b7794..4024249 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
@@ -18,7 +18,6 @@
 package org.apache.flink.kubernetes.operator.crd.status;
 
 import org.apache.flink.annotation.Experimental;
-import 
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkSessionJobReconciliationStatus.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkSessionJobReconciliationStatus.java
index b0b18c0..c310087 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkSessionJobReconciliationStatus.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkSessionJobReconciliationStatus.java
@@ -41,5 +41,5 @@ public class FlinkSessionJobReconciliationStatus {
     /**
      * Last reconciled job spec. Used to decide whether further reconciliation 
steps are necessary.
      */
-    private FlinkSessionJobSpec flinkSessionJobSpec;
+    private FlinkSessionJobSpec lastReconciledSpec;
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobManagerDeploymentStatus.java
similarity index 97%
rename from 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
rename to 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobManagerDeploymentStatus.java
index 6d50154..249fb51 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobManagerDeploymentStatus.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.observer;
+package org.apache.flink.kubernetes.operator.crd.status;
 
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
index 1f7e16e..29621ea 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
@@ -17,18 +17,16 @@
 
 package org.apache.flink.kubernetes.operator.observer;
 
-import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 
-/** The Observer of {@link FlinkDeployment}. */
-public interface Observer {
+/** The Observer of custom resource. */
+public interface Observer<CR> {
 
     /**
      * Observe the flinkApp status, It will reflect the changed status on the 
flinkApp resource.
      *
-     * @param flinkApp the target flinkDeployment resource
+     * @param cr the target custom resource
      * @param context the context with which the operation is executed
      */
-    void observe(FlinkDeployment flinkApp, Context context);
+    void observe(CR cr, Context context);
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointFetchResult.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointFetchResult.java
index 6d4ff34..74524c0 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointFetchResult.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointFetchResult.java
@@ -24,9 +24,9 @@ import lombok.Value;
 /** Result of a fetch savepoint operation. */
 @Value
 public class SavepointFetchResult {
-    private final Savepoint savepoint;
-    private final boolean isTriggered;
-    private final String error;
+    Savepoint savepoint;
+    boolean isTriggered;
+    String error;
 
     public static SavepointFetchResult notTriggered() {
         return new SavepointFetchResult(null, false, null);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
similarity index 92%
rename from 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
rename to 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
index 4afc227..e5d1bb9 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
@@ -1,11 +1,10 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -16,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.observer;
+package org.apache.flink.kubernetes.operator.observer.deployment;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
@@ -25,8 +24,10 @@ import 
org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
+import org.apache.flink.kubernetes.operator.observer.Observer;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 
@@ -46,7 +47,7 @@ import java.util.List;
 import java.util.Optional;
 
 /** The base observer. */
-public abstract class BaseObserver implements Observer {
+public abstract class AbstractDeploymentObserver implements 
Observer<FlinkDeployment> {
 
     protected final Logger logger = LoggerFactory.getLogger(this.getClass());
 
@@ -56,7 +57,7 @@ public abstract class BaseObserver implements Observer {
     protected final FlinkOperatorConfiguration operatorConfiguration;
     protected final Configuration flinkConfig;
 
-    public BaseObserver(
+    public AbstractDeploymentObserver(
             FlinkService flinkService,
             FlinkOperatorConfiguration operatorConfiguration,
             Configuration flinkConfig) {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
similarity index 89%
rename from 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java
rename to 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
index 1c60a87..1dd54ea 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
@@ -1,11 +1,10 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -16,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.observer;
+package org.apache.flink.kubernetes.operator.observer.deployment;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
@@ -24,6 +23,7 @@ import 
org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
+import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
@@ -38,9 +38,9 @@ import java.util.List;
 import java.util.concurrent.TimeoutException;
 
 /** The observer of {@link 
org.apache.flink.kubernetes.operator.config.Mode#APPLICATION} cluster. */
-public class JobObserver extends BaseObserver {
+public class ApplicationObserver extends AbstractDeploymentObserver {
 
-    public JobObserver(
+    public ApplicationObserver(
             FlinkService flinkService,
             FlinkOperatorConfiguration operatorConfiguration,
             Configuration flinkConfig) {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ObserverFactory.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java
similarity index 70%
rename from 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ObserverFactory.java
rename to 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java
index a0a3566..be2d0f8 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ObserverFactory.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java
@@ -1,11 +1,10 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -16,12 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.observer;
+package org.apache.flink.kubernetes.operator.observer.deployment;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.config.Mode;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.observer.Observer;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 
 import java.util.Map;
@@ -33,7 +33,7 @@ public class ObserverFactory {
     private final FlinkService flinkService;
     private final FlinkOperatorConfiguration operatorConfig;
     private final Configuration flinkConfig;
-    private final Map<Mode, Observer> observerMap;
+    private final Map<Mode, Observer<FlinkDeployment>> observerMap;
 
     public ObserverFactory(
             FlinkService flinkService,
@@ -45,7 +45,7 @@ public class ObserverFactory {
         this.observerMap = new ConcurrentHashMap<>();
     }
 
-    public Observer getOrCreate(FlinkDeployment flinkApp) {
+    public Observer<FlinkDeployment> getOrCreate(FlinkDeployment flinkApp) {
         return observerMap.computeIfAbsent(
                 Mode.getMode(flinkApp),
                 mode -> {
@@ -53,7 +53,8 @@ public class ObserverFactory {
                         case SESSION:
                             return new SessionObserver(flinkService, 
operatorConfig, flinkConfig);
                         case APPLICATION:
-                            return new JobObserver(flinkService, 
operatorConfig, flinkConfig);
+                            return new ApplicationObserver(
+                                    flinkService, operatorConfig, flinkConfig);
                         default:
                             throw new UnsupportedOperationException(
                                     String.format("Unsupported running mode: 
%s", mode));
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
similarity index 76%
rename from 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java
rename to 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
index f74af3b..7625c9c 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
@@ -1,11 +1,10 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -16,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.observer;
+package org.apache.flink.kubernetes.operator.observer.deployment;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
@@ -28,7 +27,7 @@ import io.javaoperatorsdk.operator.api.reconciler.Context;
 import java.util.concurrent.TimeoutException;
 
 /** The observer of the {@link 
org.apache.flink.kubernetes.operator.config.Mode#SESSION} cluster. */
-public class SessionObserver extends BaseObserver {
+public class SessionObserver extends AbstractDeploymentObserver {
 
     public SessionObserver(
             FlinkService flinkService,
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
similarity index 63%
copy from 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
copy to 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
index 1f7e16e..991323c 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
@@ -15,20 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.observer;
+package org.apache.flink.kubernetes.operator.observer.sessionjob;
 
-import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.observer.Observer;
 
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 
-/** The Observer of {@link FlinkDeployment}. */
-public interface Observer {
-
-    /**
-     * Observe the flinkApp status, It will reflect the changed status on the 
flinkApp resource.
-     *
-     * @param flinkApp the target flinkDeployment resource
-     * @param context the context with which the operation is executed
-     */
-    void observe(FlinkDeployment flinkApp, Context context);
+/** The observer of {@link FlinkSessionJob}. */
+public class SessionJobObserver implements Observer<FlinkSessionJob> {
+    @Override
+    public void observe(FlinkSessionJob flinkSessionJob, Context context) {}
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/Reconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/Reconciler.java
index 2364caf..d1e2f10 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/Reconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/Reconciler.java
@@ -19,31 +19,34 @@
 package org.apache.flink.kubernetes.operator.reconciler;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
 
-/** The interface of reconciler. */
-public interface Reconciler {
+/**
+ * The interface of reconciler.
+ *
+ * @param <CR> The custom resource to be reconciled.
+ */
+public interface Reconciler<CR> {
 
     /**
-     * This is called when receiving the create or update event of the 
FlinkDeployment resource.
+     * This is called when receiving the create or update event of the custom 
resource.
      *
-     * @param flinkApp the FlinkDeployment resource that has been created or 
updated
+     * @param cr the custom resource that has been created or updated
      * @param context the context with which the operation is executed
-     * @param effectiveConfig the effective config of the flinkApp
+     * @param effectiveConfig the effective config of the target resource
      */
-    void reconcile(FlinkDeployment flinkApp, Context context, Configuration 
effectiveConfig)
-            throws Exception;
+    void reconcile(CR cr, Context context, Configuration effectiveConfig) 
throws Exception;
 
     /**
-     * This is called when receiving the delete event of FlinkDeployment 
resource. This method is
-     * meant to cleanup the associated components like the Flink job 
components.
+     * This is called when receiving the delete event of custom resource. This 
method is meant to
+     * cleanup the associated components like the Flink job components.
      *
-     * @param flinkApp the FlinkDeployment resource that has been deleted
+     * @param cr the custom resource that has been deleted
+     * @param context the context with which the operation is executed
      * @param effectiveConfig the effective config of the flinkApp
-     * @return DeleteControl to manage the delete behavior
+     * @return DeleteControl to manage the deletion behavior
      */
-    DeleteControl cleanup(FlinkDeployment flinkApp, Configuration 
effectiveConfig);
+    DeleteControl cleanup(CR cr, Context context, Configuration 
effectiveConfig);
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index 8fd27f2..7682471 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -20,15 +20,19 @@ package org.apache.flink.kubernetes.operator.reconciler;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import 
org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobReconciliationStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.util.Preconditions;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import io.fabric8.kubernetes.client.CustomResource;
 import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
 
 import java.time.Duration;
@@ -74,6 +78,22 @@ public class ReconciliationUtils {
         reconciliationStatus.setError(err);
     }
 
+    public static void updateForSpecReconciliationSuccess(FlinkSessionJob 
sessionJob) {
+        FlinkSessionJobReconciliationStatus reconciliationStatus =
+                sessionJob.getStatus().getReconciliationStatus();
+        reconciliationStatus.setSuccess(true);
+        reconciliationStatus.setError(null);
+        FlinkSessionJobSpec clonedSpec = clone(sessionJob.getSpec());
+        reconciliationStatus.setLastReconciledSpec(clonedSpec);
+    }
+
+    public static void updateForReconciliationError(FlinkSessionJob 
flinkSessionJob, String err) {
+        FlinkSessionJobReconciliationStatus reconciliationStatus =
+                flinkSessionJob.getStatus().getReconciliationStatus();
+        reconciliationStatus.setSuccess(false);
+        reconciliationStatus.setError(err);
+    }
+
     public static <T> T clone(T object) {
         if (object == null) {
             return null;
@@ -87,12 +107,10 @@ public class ReconciliationUtils {
         }
     }
 
-    public static UpdateControl<FlinkDeployment> toUpdateControl(
-            FlinkOperatorConfiguration operatorConfiguration,
-            FlinkDeployment originalCopy,
-            FlinkDeployment current,
-            boolean reschedule) {
-        UpdateControl<FlinkDeployment> updateControl;
+    public static <CR extends CustomResource> UpdateControl<CR> 
toUpdateControl(
+            CR originalCopy, CR current) {
+
+        UpdateControl<CR> updateControl;
         if (!Objects.equals(originalCopy.getSpec(), current.getSpec())) {
             throw new UnsupportedOperationException(
                     "Detected spec change after reconcile, this probably 
indicates a bug.");
@@ -106,6 +124,16 @@ public class ReconciliationUtils {
             updateControl = UpdateControl.noUpdate();
         }
 
+        return updateControl;
+    }
+
+    public static UpdateControl<FlinkDeployment> toUpdateControl(
+            FlinkOperatorConfiguration operatorConfiguration,
+            FlinkDeployment originalCopy,
+            FlinkDeployment current,
+            boolean reschedule) {
+        UpdateControl<FlinkDeployment> updateControl = 
toUpdateControl(originalCopy, current);
+
         if (!reschedule) {
             return updateControl;
         }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractDeploymentReconciler.java
similarity index 82%
rename from 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
rename to 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractDeploymentReconciler.java
index 21d6cf1..c0aced3 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractDeploymentReconciler.java
@@ -15,26 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.reconciler;
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import 
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
+import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
 import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
 
 /** BaseReconciler with functionality that is common to job and session modes. 
*/
-public abstract class BaseReconciler implements Reconciler {
+public abstract class AbstractDeploymentReconciler implements 
Reconciler<FlinkDeployment> {
 
     protected final FlinkOperatorConfiguration operatorConfiguration;
     protected final KubernetesClient kubernetesClient;
     protected final FlinkService flinkService;
 
-    public BaseReconciler(
+    public AbstractDeploymentReconciler(
             KubernetesClient kubernetesClient,
             FlinkService flinkService,
             FlinkOperatorConfiguration operatorConfiguration) {
@@ -44,7 +46,8 @@ public abstract class BaseReconciler implements Reconciler {
     }
 
     @Override
-    public DeleteControl cleanup(FlinkDeployment flinkApp, Configuration 
effectiveConfig) {
+    public DeleteControl cleanup(
+            FlinkDeployment flinkApp, Context context, Configuration 
effectiveConfig) {
         return shutdownAndDelete(flinkApp, effectiveConfig);
     }
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
similarity index 95%
rename from 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
rename to 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 55ed5ee..b49e18f 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.reconciler;
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
@@ -26,9 +26,10 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
-import 
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
@@ -43,17 +44,17 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Optional;
 
-import static 
org.apache.flink.kubernetes.operator.observer.BaseObserver.JOB_STATE_UNKNOWN;
+import static 
org.apache.flink.kubernetes.operator.observer.deployment.AbstractDeploymentObserver.JOB_STATE_UNKNOWN;
 
 /**
  * Reconciler responsible for handling the job lifecycle according to the 
desired and current
  * states.
  */
-public class JobReconciler extends BaseReconciler {
+public class ApplicationReconciler extends AbstractDeploymentReconciler {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(JobReconciler.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(ApplicationReconciler.class);
 
-    public JobReconciler(
+    public ApplicationReconciler(
             KubernetesClient kubernetesClient,
             FlinkService flinkService,
             FlinkOperatorConfiguration operatorConfiguration) {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconcilerFactory.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
similarity index 73%
rename from 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconcilerFactory.java
rename to 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
index bdcd8e4..d2f55af 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconcilerFactory.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
@@ -1,11 +1,10 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -16,11 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.reconciler;
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
 
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.config.Mode;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
@@ -34,7 +34,7 @@ public class ReconcilerFactory {
     private final KubernetesClient kubernetesClient;
     private final FlinkService flinkService;
     private final FlinkOperatorConfiguration operatorConfiguration;
-    private final Map<Mode, Reconciler> reconcilerMap;
+    private final Map<Mode, Reconciler<FlinkDeployment>> reconcilerMap;
 
     public ReconcilerFactory(
             KubernetesClient kubernetesClient,
@@ -46,7 +46,7 @@ public class ReconcilerFactory {
         this.reconcilerMap = new ConcurrentHashMap<>();
     }
 
-    public Reconciler getOrCreate(FlinkDeployment flinkApp) {
+    public Reconciler<FlinkDeployment> getOrCreate(FlinkDeployment flinkApp) {
         return reconcilerMap.computeIfAbsent(
                 Mode.getMode(flinkApp),
                 mode -> {
@@ -55,7 +55,7 @@ public class ReconcilerFactory {
                             return new SessionReconciler(
                                     kubernetesClient, flinkService, 
operatorConfiguration);
                         case APPLICATION:
-                            return new JobReconciler(
+                            return new ApplicationReconciler(
                                     kubernetesClient, flinkService, 
operatorConfiguration);
                         default:
                             throw new UnsupportedOperationException(
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
similarity index 92%
rename from 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
rename to 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index 525efd6..98f2683 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -15,13 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.reconciler;
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
-import 
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
+import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 
@@ -34,7 +35,7 @@ import org.slf4j.LoggerFactory;
  * Reconciler responsible for handling the session cluster lifecycle according 
to the desired and
  * current states.
  */
-public class SessionReconciler extends BaseReconciler {
+public class SessionReconciler extends AbstractDeploymentReconciler {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SessionReconciler.class);
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java
new file mode 100644
index 0000000..9bb5e4a
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.sessionjob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
+import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import 
org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/** The reconciler for the {@link FlinkSessionJob}. */
+public class FlinkSessionJobReconciler implements Reconciler<FlinkSessionJob> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ApplicationReconciler.class);
+
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final KubernetesClient kubernetesClient;
+    private final FlinkService flinkService;
+
+    public FlinkSessionJobReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.operatorConfiguration = operatorConfiguration;
+    }
+
+    @Override
+    public void reconcile(
+            FlinkSessionJob flinkSessionJob, Context context, Configuration 
defaultConfig)
+            throws Exception {
+
+        FlinkSessionJobSpec lastReconciledSpec =
+                
flinkSessionJob.getStatus().getReconciliationStatus().getLastReconciledSpec();
+
+        if (lastReconciledSpec == null) {
+            submitFlinkJob(flinkSessionJob, context, defaultConfig);
+            return;
+        }
+
+        boolean specChanged = 
!flinkSessionJob.getSpec().equals(lastReconciledSpec);
+
+        if (specChanged) {
+            // TODO reconcile other spec change.
+            LOG.info("Other spec change have not supported");
+        }
+    }
+
+    @Override
+    public DeleteControl cleanup(
+            FlinkSessionJob sessionJob, Context context, Configuration 
defaultConfig) {
+        Optional<FlinkDeployment> flinkDepOptional =
+                OperatorUtils.getSecondaryResource(sessionJob, context, 
operatorConfiguration);
+
+        if (flinkDepOptional.isPresent()) {
+            Configuration effectiveConfig =
+                    FlinkUtils.getEffectiveConfig(flinkDepOptional.get(), 
defaultConfig);
+            String jobID = sessionJob.getStatus().getJobStatus().getJobId();
+            if (jobID != null) {
+                try {
+                    flinkService.cancelSessionJob(JobID.fromHexString(jobID), 
effectiveConfig);
+                } catch (Exception e) {
+                    LOG.error("Failed to cancel job.", e);
+                }
+            }
+        } else {
+            LOG.info("Session cluster deployment not available");
+        }
+        return DeleteControl.defaultDelete();
+    }
+
+    private void submitFlinkJob(
+            FlinkSessionJob sessionJob, Context context, Configuration 
defaultConfig)
+            throws Exception {
+        Optional<FlinkDeployment> flinkDepOptional =
+                OperatorUtils.getSecondaryResource(sessionJob, context, 
operatorConfiguration);
+        if (flinkDepOptional.isPresent()) {
+            var flinkdep = flinkDepOptional.get();
+            var jobDeploymentStatus = 
flinkdep.getStatus().getJobManagerDeploymentStatus();
+            if (jobDeploymentStatus == JobManagerDeploymentStatus.READY) {
+                Configuration effectiveConfig =
+                        FlinkUtils.getEffectiveConfig(flinkdep, defaultConfig);
+                flinkService.submitJobToSessionCluster(sessionJob, 
effectiveConfig);
+                
ReconciliationUtils.updateForSpecReconciliationSuccess(sessionJob);
+            } else {
+                LOG.info(
+                        "Session cluster deployment is in {} status, not ready 
for serve",
+                        jobDeploymentStatus);
+            }
+        } else {
+            LOG.info("Session cluster deployment is not found");
+        }
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index 2740c53..7cf22e0 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -33,17 +33,23 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import 
org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
+import org.apache.flink.kubernetes.operator.artifact.JarResolver;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
 import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import 
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.rest.FileUpload;
+import org.apache.flink.runtime.rest.RestClient;
 import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
 import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.TriggerId;
 import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
@@ -52,9 +58,18 @@ import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMess
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
+import org.apache.flink.runtime.rest.util.RestConstants;
+import org.apache.flink.runtime.webmonitor.handlers.JarRunHeaders;
+import org.apache.flink.runtime.webmonitor.handlers.JarRunMessageParameters;
+import org.apache.flink.runtime.webmonitor.handlers.JarRunRequestBody;
+import org.apache.flink.runtime.webmonitor.handlers.JarRunResponseBody;
+import org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders;
+import org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody;
 import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
 import io.fabric8.kubernetes.api.model.PodList;
 import io.fabric8.kubernetes.client.KubernetesClient;
@@ -70,9 +85,14 @@ import java.net.Socket;
 import java.net.SocketAddress;
 import java.net.SocketTimeoutException;
 import java.net.URI;
+import java.nio.file.Path;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -83,11 +103,17 @@ public class FlinkService {
 
     private final KubernetesClient kubernetesClient;
     private final FlinkOperatorConfiguration operatorConfiguration;
+    private final JarResolver jarResolver;
+    private final ExecutorService executorService;
 
     public FlinkService(
             KubernetesClient kubernetesClient, FlinkOperatorConfiguration 
operatorConfiguration) {
         this.kubernetesClient = kubernetesClient;
         this.operatorConfiguration = operatorConfiguration;
+        this.jarResolver = new JarResolver();
+        this.executorService =
+                Executors.newFixedThreadPool(
+                        4, new 
ExecutorThreadFactory("Flink-RestClusterClient-IO"));
     }
 
     public void submitApplicationCluster(FlinkDeployment deployment, 
Configuration conf)
@@ -132,6 +158,77 @@ public class FlinkService {
         LOG.info("Session cluster successfully deployed");
     }
 
+    public void submitJobToSessionCluster(FlinkSessionJob sessionJob, 
Configuration conf)
+            throws Exception {
+        var jarRunResponseBody = jarRun(sessionJob, jarUpload(sessionJob, 
conf), conf);
+        String jobID = jarRunResponseBody.getJobId().toHexString();
+        LOG.info("Submitted job: {} to session cluster.", jobID);
+        
sessionJob.getStatus().setJobStatus(JobStatus.builder().jobId(jobID).build());
+    }
+
+    private JarRunResponseBody jarRun(
+            FlinkSessionJob sessionJob, JarUploadResponseBody response, 
Configuration conf) {
+        String jarId =
+                
response.getFilename().substring(response.getFilename().lastIndexOf("/") + 1);
+        // we generate jobID in advance to help deduplicate job submission.
+        JobID jobID = new JobID();
+        try (RestClusterClient<String> clusterClient =
+                (RestClusterClient<String>) getClusterClient(conf)) {
+            JarRunHeaders headers = JarRunHeaders.getInstance();
+            JarRunMessageParameters parameters = 
headers.getUnresolvedMessageParameters();
+            parameters.jarIdPathParameter.resolve(jarId);
+            JobSpec job = sessionJob.getSpec().getJob();
+            JarRunRequestBody runRequestBody =
+                    new JarRunRequestBody(
+                            job.getEntryClass(),
+                            null,
+                            job.getArgs() == null ? null : 
Arrays.asList(job.getArgs()),
+                            job.getParallelism() > 0 ? job.getParallelism() : 
null,
+                            jobID,
+                            null,
+                            
sessionJob.getSpec().getJob().getInitialSavepointPath());
+            LOG.info("Submitting job: {} to session cluster.", 
jobID.toHexString());
+            return clusterClient
+                    .sendRequest(headers, parameters, runRequestBody)
+                    .get(
+                            
operatorConfiguration.getFlinkClientTimeout().toSeconds(),
+                            TimeUnit.SECONDS);
+        } catch (Exception e) {
+            LOG.error("Failed to submit job to session cluster.", e);
+            throw new FlinkRuntimeException(e);
+        }
+    }
+
+    private JarUploadResponseBody jarUpload(FlinkSessionJob sessionJob, 
Configuration conf)
+            throws Exception {
+        Path path = 
jarResolver.resolve(sessionJob.getSpec().getJob().getJarURI());
+        JarUploadHeaders headers = JarUploadHeaders.getInstance();
+        String clusterId = sessionJob.getSpec().getClusterId();
+        String namespace = sessionJob.getMetadata().getNamespace();
+        int port = conf.getInteger(RestOptions.PORT);
+        String host =
+                ObjectUtils.firstNonNull(
+                        operatorConfiguration.getFlinkServiceHostOverride(),
+                        
ExternalServiceDecorator.getNamespacedExternalServiceName(
+                                clusterId, namespace));
+
+        try (RestClient restClient = new RestClient(conf, executorService)) {
+            // TODO add method in flink#RestClusterClient to support upload 
jar.
+            return restClient
+                    .sendRequest(
+                            host,
+                            port,
+                            headers,
+                            EmptyMessageParameters.getInstance(),
+                            EmptyRequestBody.getInstance(),
+                            Collections.singletonList(
+                                    new FileUpload(path, 
RestConstants.CONTENT_TYPE_JAR)))
+                    .get(
+                            
operatorConfiguration.getFlinkClientTimeout().toSeconds(),
+                            TimeUnit.SECONDS);
+        }
+    }
+
     public boolean isJobManagerPortReady(Configuration config) {
         final URI uri;
         try (ClusterClient<String> clusterClient = getClusterClient(config)) {
@@ -184,7 +281,11 @@ public class FlinkService {
             final String clusterId = clusterClient.getClusterId();
             switch (upgradeMode) {
                 case STATELESS:
-                    clusterClient.cancel(jobID).get(1, TimeUnit.MINUTES);
+                    clusterClient
+                            .cancel(jobID)
+                            .get(
+                                    
operatorConfiguration.getCancelJobTimeout().toSeconds(),
+                                    TimeUnit.SECONDS);
                     break;
                 case SAVEPOINT:
                     final String savepointDirectory =
@@ -222,6 +323,14 @@ public class FlinkService {
         return savepointOpt;
     }
 
+    public void cancelSessionJob(JobID jobID, Configuration conf) throws 
Exception {
+        try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
+            clusterClient
+                    .cancel(jobID)
+                    
.get(operatorConfiguration.getCancelJobTimeout().toSeconds(), TimeUnit.SECONDS);
+        }
+    }
+
     public void stopSessionCluster(
             FlinkDeployment deployment, Configuration conf, boolean 
deleteHaData) {
         FlinkUtils.deleteCluster(deployment, kubernetesClient, deleteHaData);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/OperatorUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/OperatorUtils.java
index 28dc8fc..4b7cb60 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/OperatorUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/OperatorUtils.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.kubernetes.operator.utils;
 
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.kubernetes.utils.Constants;
 
 import io.fabric8.kubernetes.api.model.HasMetadata;
@@ -26,6 +29,7 @@ import io.fabric8.kubernetes.api.model.apps.DeploymentList;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
 import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
 import 
io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
 import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
 import org.apache.commons.lang3.StringUtils;
@@ -33,6 +37,7 @@ import org.apache.commons.lang3.StringUtils;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Optional;
 import java.util.Set;
 
 /** Operator SDK related utility functions. */
@@ -79,4 +84,15 @@ public class OperatorUtils {
             return new 
HashSet<>(Arrays.asList(watchedNamespaces.split(NAMESPACES_SPLITTER_KEY)));
         }
     }
+
+    public static Optional<FlinkDeployment> getSecondaryResource(
+            FlinkSessionJob sessionJob,
+            Context context,
+            FlinkOperatorConfiguration operatorConfiguration) {
+        var identifier =
+                operatorConfiguration.getWatchedNamespaces().size() >= 1
+                        ? sessionJob.getMetadata().getNamespace()
+                        : null;
+        return context.getSecondaryResource(FlinkDeployment.class, identifier);
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
similarity index 86%
rename from 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
rename to 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index 828e539..3efb504 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec;
@@ -31,8 +32,8 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.Resource;
 import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
-import 
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
@@ -43,8 +44,8 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
-/** Default validator implementation. */
-public class DefaultDeploymentValidator implements FlinkDeploymentValidator {
+/** Default validator implementation for {@link FlinkDeployment}. */
+public class DefaultValidator implements FlinkResourceValidator {
 
     private static final String[] FORBIDDEN_CONF_KEYS =
             new String[] {
@@ -55,7 +56,7 @@ public class DefaultDeploymentValidator implements 
FlinkDeploymentValidator {
             Set.of(Constants.CONFIG_FILE_LOG4J_NAME, 
Constants.CONFIG_FILE_LOGBACK_NAME);
 
     @Override
-    public Optional<String> validate(FlinkDeployment deployment) {
+    public Optional<String> validateDeployment(FlinkDeployment deployment) {
         FlinkDeploymentSpec spec = deployment.getSpec();
         return firstPresent(
                 validateFlinkVersion(spec.getFlinkVersion()),
@@ -270,4 +271,42 @@ public class DefaultDeploymentValidator implements 
FlinkDeploymentValidator {
 
         return Optional.empty();
     }
+
+    // validate session job
+
+    @Override
+    public Optional<String> validateSessionJob(
+            FlinkSessionJob sessionJob, Optional<FlinkDeployment> session) {
+
+        return firstPresent(
+                validateNotApplicationCluster(session),
+                validateSessionClusterId(sessionJob, session));
+    }
+
+    private Optional<String> validateSessionClusterId(
+            FlinkSessionJob sessionJob, Optional<FlinkDeployment> session) {
+        return session.flatMap(
+                deployment -> {
+                    if (!deployment
+                            .getMetadata()
+                            .getName()
+                            .equals(sessionJob.getSpec().getClusterId())) {
+                        return Optional.of(
+                                "The session job's cluster id is not match 
with the session cluster");
+                    } else {
+                        return Optional.empty();
+                    }
+                });
+    }
+
+    private Optional<String> 
validateNotApplicationCluster(Optional<FlinkDeployment> session) {
+        return session.flatMap(
+                deployment -> {
+                    if (deployment.getSpec().getJob() != null) {
+                        return Optional.of("Can not submit to application 
cluster");
+                    } else {
+                        return Optional.empty();
+                    }
+                });
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkDeploymentValidator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkResourceValidator.java
similarity index 64%
copy from 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkDeploymentValidator.java
copy to 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkResourceValidator.java
index 732224d..c5c13c1 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkDeploymentValidator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkResourceValidator.java
@@ -18,11 +18,12 @@
 package org.apache.flink.kubernetes.operator.validation;
 
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 
 import java.util.Optional;
 
-/** Validator for {@link FlinkDeployment} resources. */
-public interface FlinkDeploymentValidator {
+/** Validator for different resources. */
+public interface FlinkResourceValidator {
 
     /**
      * Validate and return optional error.
@@ -30,5 +31,15 @@ public interface FlinkDeploymentValidator {
      * @param deployment
      * @return Optional error string, should be present iff validation 
resulted in an error
      */
-    Optional<String> validate(FlinkDeployment deployment);
+    Optional<String> validateDeployment(FlinkDeployment deployment);
+
+    /**
+     * Validate and return optional error.
+     *
+     * @param sessionJob the session job to be validated.
+     * @param session the target session cluster of the session job to be 
validated.
+     * @return Optional error string, should be present iff validation 
resulted in an error
+     */
+    Optional<String> validateSessionJob(
+            FlinkSessionJob sessionJob, Optional<FlinkDeployment> session);
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index fde3bfd..929afb1 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -21,7 +21,9 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
@@ -30,6 +32,8 @@ import org.apache.flink.kubernetes.operator.crd.spec.Resource;
 import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
+import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 
 import io.fabric8.kubernetes.api.model.Container;
@@ -59,6 +63,7 @@ public class TestUtils {
 
     public static final String TEST_NAMESPACE = "flink-operator-test";
     public static final String TEST_DEPLOYMENT_NAME = "test-cluster";
+    public static final String TEST_SESSION_JOB_NAME = "test-session-job";
     public static final String SERVICE_ACCOUNT = "flink-operator";
     public static final String FLINK_VERSION = "latest";
     public static final String IMAGE = String.format("flink:%s", 
FLINK_VERSION);
@@ -91,6 +96,28 @@ public class TestUtils {
         return deployment;
     }
 
+    public static FlinkSessionJob buildSessionJob() {
+        FlinkSessionJob sessionJob = new FlinkSessionJob();
+        sessionJob.setStatus(new FlinkSessionJobStatus());
+        sessionJob.setMetadata(
+                new ObjectMetaBuilder()
+                        .withName(TEST_SESSION_JOB_NAME)
+                        .withNamespace(TEST_NAMESPACE)
+                        .build());
+        sessionJob.setSpec(
+                FlinkSessionJobSpec.builder()
+                        .clusterId(TEST_DEPLOYMENT_NAME)
+                        .job(
+                                JobSpec.builder()
+                                        .jarURI(SAMPLE_JAR)
+                                        .parallelism(1)
+                                        .upgradeMode(UpgradeMode.STATELESS)
+                                        .state(JobState.RUNNING)
+                                        .build())
+                        .build());
+        return sessionJob;
+    }
+
     public static FlinkDeploymentSpec getTestFlinkDeploymentSpec() {
         Map<String, String> conf = new HashMap<>();
         conf.put(TaskManagerOptions.NUM_TASK_SLOTS.key(), "2");
@@ -195,6 +222,41 @@ public class TestUtils {
         };
     }
 
+    public static Context createContextWithReadyFlinkDeployment() {
+        return new Context() {
+            @Override
+            public Optional<RetryInfo> getRetryInfo() {
+                return Optional.empty();
+            }
+
+            @Override
+            public <T> Optional<T> getSecondaryResource(
+                    Class<T> expectedType, String eventSourceName) {
+                var session = buildSessionCluster();
+                
session.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+                return Optional.of((T) session);
+            }
+        };
+    }
+
+    public static Context createContextWithNotReadyFlinkDeployment() {
+        return new Context() {
+            @Override
+            public Optional<RetryInfo> getRetryInfo() {
+                return Optional.empty();
+            }
+
+            @Override
+            public <T> Optional<T> getSecondaryResource(
+                    Class<T> expectedType, String eventSourceName) {
+                var session = buildSessionCluster();
+                session.getStatus()
+                        
.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+                return Optional.of((T) session);
+            }
+        };
+    }
+
     public static final String DEPLOYMENT_ERROR = "test deployment error 
message";
 
     public static Context createContextWithFailedJobManagerDeployment() {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 214bd03..b8721e0 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
@@ -81,6 +82,18 @@ public class TestingFlinkService extends FlinkService {
     }
 
     @Override
+    public void submitJobToSessionCluster(FlinkSessionJob sessionJob, 
Configuration conf) {
+        JobID jobID = new JobID();
+        JobStatusMessage jobStatusMessage =
+                new JobStatusMessage(
+                        jobID,
+                        sessionJob.getMetadata().getName(),
+                        JobStatus.RUNNING,
+                        System.currentTimeMillis());
+        jobs.add(Tuple2.of(conf.get(SavepointConfigOptions.SAVEPOINT_PATH), 
jobStatusMessage));
+    }
+
+    @Override
     public List<JobStatusMessage> listJobs(Configuration conf) throws 
Exception {
         listJobConsumer.accept(conf);
         if (jobs.isEmpty() && !sessions.isEmpty()) {
@@ -121,6 +134,13 @@ public class TestingFlinkService extends FlinkService {
     }
 
     @Override
+    public void cancelSessionJob(JobID jobID, Configuration conf) throws 
Exception {
+        if (!jobs.removeIf(js -> js.f1.getJobId().equals(jobID))) {
+            throw new Exception("Job not found");
+        }
+    }
+
+    @Override
     public void stopSessionCluster(
             FlinkDeployment deployment, Configuration conf, boolean deleteHa) {
         sessions.remove(deployment.getMetadata().getName());
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkDeploymentValidator.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/artifact/JarResolverTest.java
similarity index 60%
rename from 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkDeploymentValidator.java
rename to 
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/artifact/JarResolverTest.java
index 732224d..694cf84 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkDeploymentValidator.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/artifact/JarResolverTest.java
@@ -15,20 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.validation;
+package org.apache.flink.kubernetes.operator.artifact;
 
-import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
 
-import java.util.Optional;
+import java.nio.file.Path;
 
-/** Validator for {@link FlinkDeployment} resources. */
-public interface FlinkDeploymentValidator {
+/** Test for {@link JarResolver}. */
+public class JarResolverTest {
 
-    /**
-     * Validate and return optional error.
-     *
-     * @param deployment
-     * @return Optional error string, should be present iff validation 
resulted in an error
-     */
-    Optional<String> validate(FlinkDeployment deployment);
+    @Test
+    public void testResolve() throws Exception {
+        String jarUri = "file:///opt/flink/test.jar";
+        JarResolver resolver = new JarResolver();
+        Path path = resolver.resolve(jarUri);
+        Assertions.assertEquals("/opt/flink/test.jar", 
path.toAbsolutePath().toString());
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 3b93713..3b358ef 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -28,16 +28,16 @@ import 
org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
-import org.apache.flink.kubernetes.operator.observer.BaseObserver;
-import 
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
-import org.apache.flink.kubernetes.operator.observer.ObserverFactory;
-import org.apache.flink.kubernetes.operator.reconciler.ReconcilerFactory;
+import 
org.apache.flink.kubernetes.operator.observer.deployment.AbstractDeploymentObserver;
+import 
org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import 
org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
-import 
org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator;
+import org.apache.flink.kubernetes.operator.validation.DefaultValidator;
 import org.apache.flink.runtime.client.JobStatusMessage;
 
 import io.fabric8.kubernetes.api.model.EventBuilder;
@@ -461,7 +461,7 @@ public class FlinkDeploymentControllerTest {
                 JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
                 appCluster.getStatus().getJobManagerDeploymentStatus());
         // jobStatus has not been set at this time
-        assertEquals(BaseObserver.JOB_STATE_UNKNOWN, jobStatus.getState());
+        assertEquals(AbstractDeploymentObserver.JOB_STATE_UNKNOWN, 
jobStatus.getState());
 
         // Switches operator mode to SESSION
         appCluster.getSpec().setJob(null);
@@ -607,8 +607,7 @@ public class FlinkDeploymentControllerTest {
                         defaultConfig,
                         operatorConfiguration,
                         kubernetesClient,
-                        "test",
-                        new DefaultDeploymentValidator(),
+                        new DefaultValidator(),
                         new ReconcilerFactory(
                                 kubernetesClient, flinkService, 
operatorConfiguration),
                         new ObserverFactory(
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ApplicationObserverTest.java
similarity index 93%
rename from 
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
rename to 
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ApplicationObserverTest.java
index 8ea1063..4564e3d 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ApplicationObserverTest.java
@@ -24,8 +24,11 @@ import 
org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
+import 
org.apache.flink.kubernetes.operator.observer.deployment.AbstractDeploymentObserver;
+import 
org.apache.flink.kubernetes.operator.observer.deployment.ApplicationObserver;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 
@@ -37,8 +40,8 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-/** {@link JobObserver} unit tests. */
-public class JobObserverTest {
+/** {@link ApplicationObserver} unit tests. */
+public class ApplicationObserverTest {
 
     private final Context readyContext = 
TestUtils.createContextWithReadyJobManagerDeployment();
 
@@ -46,8 +49,8 @@ public class JobObserverTest {
     public void observeApplicationCluster() {
         Configuration flinkConf = new Configuration();
         TestingFlinkService flinkService = new TestingFlinkService();
-        JobObserver observer =
-                new JobObserver(
+        ApplicationObserver observer =
+                new ApplicationObserver(
                         flinkService,
                         FlinkOperatorConfiguration.fromConfiguration(new 
Configuration()),
                         flinkConf);
@@ -134,15 +137,16 @@ public class JobObserverTest {
                 JobManagerDeploymentStatus.READY,
                 deployment.getStatus().getJobManagerDeploymentStatus());
         assertEquals(
-                BaseObserver.JOB_STATE_UNKNOWN, 
deployment.getStatus().getJobStatus().getState());
+                AbstractDeploymentObserver.JOB_STATE_UNKNOWN,
+                deployment.getStatus().getJobStatus().getState());
     }
 
     @Test
     public void observeSavepoint() throws Exception {
         Configuration flinkConf = new Configuration();
         TestingFlinkService flinkService = new TestingFlinkService();
-        JobObserver observer =
-                new JobObserver(
+        ApplicationObserver observer =
+                new ApplicationObserver(
                         flinkService,
                         FlinkOperatorConfiguration.fromConfiguration(new 
Configuration()),
                         flinkConf);
@@ -205,8 +209,8 @@ public class JobObserverTest {
     public void observeListJobsError() {
         Configuration flinkConf = new Configuration();
         TestingFlinkService flinkService = new TestingFlinkService();
-        JobObserver observer =
-                new JobObserver(
+        ApplicationObserver observer =
+                new ApplicationObserver(
                         flinkService,
                         FlinkOperatorConfiguration.fromConfiguration(new 
Configuration()),
                         flinkConf);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
index 003195e..5524f7c 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
@@ -23,6 +23,8 @@ import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import 
org.apache.flink.kubernetes.operator.observer.deployment.SessionObserver;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
similarity index 93%
rename from 
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
rename to 
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index dc33b29..82f7f33 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.reconciler;
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
@@ -26,8 +26,9 @@ import 
org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
-import 
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
@@ -44,7 +45,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** @link JobStatusObserver unit tests */
-public class JobReconcilerTest {
+public class ApplicationReconcilerTest {
 
     private final FlinkOperatorConfiguration operatorConfiguration =
             FlinkOperatorConfiguration.fromConfiguration(new Configuration());
@@ -54,7 +55,8 @@ public class JobReconcilerTest {
         Context context = 
TestUtils.createContextWithReadyJobManagerDeployment();
         TestingFlinkService flinkService = new TestingFlinkService();
 
-        JobReconciler reconciler = new JobReconciler(null, flinkService, 
operatorConfiguration);
+        ApplicationReconciler reconciler =
+                new ApplicationReconciler(null, flinkService, 
operatorConfiguration);
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
         Configuration config = FlinkUtils.getEffectiveConfig(deployment, new 
Configuration());
 
@@ -105,8 +107,8 @@ public class JobReconcilerTest {
         final Context context = 
TestUtils.createContextWithReadyJobManagerDeployment();
         final TestingFlinkService flinkService = new TestingFlinkService();
 
-        final JobReconciler reconciler =
-                new JobReconciler(null, flinkService, operatorConfiguration);
+        final ApplicationReconciler reconciler =
+                new ApplicationReconciler(null, flinkService, 
operatorConfiguration);
         final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
         final Configuration config = FlinkUtils.getEffectiveConfig(deployment, 
new Configuration());
 
@@ -156,7 +158,8 @@ public class JobReconcilerTest {
     public void triggerSavepoint() throws Exception {
         Context context = 
TestUtils.createContextWithReadyJobManagerDeployment();
         TestingFlinkService flinkService = new TestingFlinkService();
-        JobReconciler reconciler = new JobReconciler(null, flinkService, 
operatorConfiguration);
+        ApplicationReconciler reconciler =
+                new ApplicationReconciler(null, flinkService, 
operatorConfiguration);
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
         Configuration config = FlinkUtils.getEffectiveConfig(deployment, new 
Configuration());
 
@@ -217,8 +220,8 @@ public class JobReconcilerTest {
         final Context context = 
TestUtils.createContextWithReadyJobManagerDeployment();
         final TestingFlinkService flinkService = new TestingFlinkService();
 
-        final JobReconciler reconciler =
-                new JobReconciler(null, flinkService, operatorConfiguration);
+        final ApplicationReconciler reconciler =
+                new ApplicationReconciler(null, flinkService, 
operatorConfiguration);
         final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
         final Configuration config = FlinkUtils.getEffectiveConfig(deployment, 
new Configuration());
         
deployment.getSpec().getFlinkConfiguration().remove(HighAvailabilityOptions.HA_MODE.key());
@@ -274,8 +277,8 @@ public class JobReconcilerTest {
         final Context context = 
TestUtils.createContextWithReadyJobManagerDeployment();
         final TestingFlinkService flinkService = new TestingFlinkService();
 
-        final JobReconciler reconciler =
-                new JobReconciler(null, flinkService, operatorConfiguration);
+        final ApplicationReconciler reconciler =
+                new ApplicationReconciler(null, flinkService, 
operatorConfiguration);
         final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
         final Configuration config = FlinkUtils.getEffectiveConfig(deployment, 
new Configuration());
 
@@ -317,7 +320,8 @@ public class JobReconcilerTest {
         Context context = 
TestUtils.createContextWithReadyJobManagerDeployment();
         TestingFlinkService flinkService = new TestingFlinkService();
 
-        JobReconciler reconciler = new JobReconciler(null, flinkService, 
operatorConfiguration);
+        ApplicationReconciler reconciler =
+                new ApplicationReconciler(null, flinkService, 
operatorConfiguration);
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
         Configuration config = FlinkUtils.getEffectiveConfig(deployment, new 
Configuration());
 
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
similarity index 93%
rename from 
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconcilerTest.java
rename to 
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
index 61b1f1c..9a975e1 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.reconciler;
+package org.apache.flink.kubernetes.operator.reconciler.deployment;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
@@ -30,7 +30,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
-/** Tests for {@link SessionReconciler}. */
+/**
+ * Tests for {@link 
org.apache.flink.kubernetes.operator.reconciler.deployment.SessionReconciler}.
+ */
 public class SessionReconcilerTest {
 
     private final FlinkOperatorConfiguration operatorConfiguration =
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconcilerTest.java
new file mode 100644
index 0000000..969c5fd
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconcilerTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.sessionjob;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/** Tests for {@link FlinkSessionJobReconciler}. */
+public class FlinkSessionJobReconcilerTest {
+
+    private final FlinkOperatorConfiguration operatorConfiguration =
+            FlinkOperatorConfiguration.fromConfiguration(new Configuration());
+
+    @Test
+    public void testSubmitAndCleanUp() throws Exception {
+        TestingFlinkService flinkService = new TestingFlinkService();
+        FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
+
+        FlinkSessionJobReconciler reconciler =
+                new FlinkSessionJobReconciler(null, flinkService, 
operatorConfiguration);
+        reconciler.reconcile(sessionJob, TestUtils.createEmptyContext(), new 
Configuration());
+        Assertions.assertEquals(0, flinkService.listJobs().size());
+        reconciler.reconcile(
+                sessionJob,
+                TestUtils.createContextWithNotReadyFlinkDeployment(),
+                new Configuration());
+        Assertions.assertEquals(0, flinkService.listJobs().size());
+        reconciler.reconcile(
+                sessionJob, TestUtils.createContextWithReadyFlinkDeployment(), 
new Configuration());
+        Assertions.assertEquals(1, flinkService.listJobs().size());
+        // clean up
+        sessionJob
+                .getStatus()
+                .getJobStatus()
+                
.setJobId(flinkService.listJobs().get(0).f1.getJobId().toHexString());
+        reconciler.cleanup(
+                sessionJob, TestUtils.createContextWithReadyFlinkDeployment(), 
new Configuration());
+        Assertions.assertEquals(0, flinkService.listJobs().size());
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
index 76bd26a..cf4507f 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
-import 
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
+import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 
 import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
similarity index 80%
rename from 
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
rename to 
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index 5f6a896..9c54763 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -22,19 +22,21 @@ import 
org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
-import 
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.utils.Constants;
 
-import org.junit.Assert;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
@@ -43,14 +45,15 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Consumer;
+import java.util.function.Supplier;
 
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 /** Test deployment validation logic. */
-public class DeploymentValidatorTest {
+public class DefaultValidatorTest {
 
-    private final DefaultDeploymentValidator validator = new 
DefaultDeploymentValidator();
+    private final DefaultValidator validator = new DefaultValidator();
 
     @Test
     public void testValidation() {
@@ -279,18 +282,76 @@ public class DeploymentValidatorTest {
         testSuccess(dep -> dep.getSpec().setFlinkVersion(FlinkVersion.v1_15));
     }
 
+    @Test
+    public void testSessionJobWithSession() {
+        testSessionJobValidateSuccess(job -> {}, session -> {});
+
+        testSessionJobValidateError(
+                sessionJob -> sessionJob.getSpec().setClusterId("not-match"),
+                deployment -> {},
+                "The session job's cluster id is not match with the session 
cluster");
+
+        testSessionJobValidateError(
+                job -> {},
+                deployment -> deployment.getSpec().setJob(new JobSpec()),
+                "Can not submit to application cluster");
+    }
+
     private void testSuccess(Consumer<FlinkDeployment> deploymentModifier) {
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
         deploymentModifier.accept(deployment);
-        validator.validate(deployment).ifPresent(Assert::fail);
+        validator.validateDeployment(deployment).ifPresent(Assertions::fail);
     }
 
     private void testError(Consumer<FlinkDeployment> deploymentModifier, 
String expectedErr) {
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
         deploymentModifier.accept(deployment);
-        Optional<String> error = validator.validate(deployment);
+        Optional<String> error = validator.validateDeployment(deployment);
+        if (error.isPresent()) {
+            assertTrue(error.get().startsWith(expectedErr), error.get());
+        } else {
+            fail("Did not get expected error: " + expectedErr);
+        }
+    }
+
+    private void testSessionJobValidateSuccess(
+            Consumer<FlinkSessionJob> sessionJobModifier,
+            Consumer<FlinkDeployment> sessionModifier) {
+        FlinkDeployment session = TestUtils.buildSessionCluster();
+        FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
+
+        sessionModifier.accept(session);
+        sessionJobModifier.accept(sessionJob);
+        validator.validateSessionJob(sessionJob, 
Optional.of(session)).ifPresent(Assertions::fail);
+    }
+
+    private void testSessionJobValidateError(
+            Consumer<FlinkSessionJob> sessionJobModifier,
+            Consumer<FlinkDeployment> sessionModifier,
+            String expectedErr) {
+        FlinkDeployment session = TestUtils.buildSessionCluster();
+        FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
+
+        sessionModifier.accept(session);
+        sessionJobModifier.accept(sessionJob);
+        testSessionJobValidateError(sessionJob, Optional.of(session), 
expectedErr);
+    }
+
+    private void testSessionJobValidateError(
+            Consumer<FlinkSessionJob> sessionJobModifier,
+            Supplier<Optional<FlinkDeployment>> sessionSupplier,
+            String expectedErr) {
+        FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
+
+        sessionJobModifier.accept(sessionJob);
+        testSessionJobValidateError(sessionJob, sessionSupplier.get(), 
expectedErr);
+    }
+
+    private void testSessionJobValidateError(
+            FlinkSessionJob sessionJob, Optional<FlinkDeployment> session, 
String expectedErr) {
+        Optional<String> error = validator.validateSessionJob(sessionJob, 
session);
         if (error.isPresent()) {
-            assertTrue(error.get(), error.get().startsWith(expectedErr));
+            assertTrue(error.get().startsWith(expectedErr), error.get());
         } else {
             fail("Did not get expected error: " + expectedErr);
         }
diff --git 
a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
 
b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
index c534273..ff1db98 100644
--- 
a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
+++ 
b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
@@ -18,7 +18,7 @@
 package org.apache.flink.kubernetes.operator.admission;
 
 import org.apache.flink.kubernetes.operator.utils.EnvUtils;
-import 
org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator;
+import org.apache.flink.kubernetes.operator.validation.DefaultValidator;
 
 import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
 import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
@@ -56,7 +56,7 @@ public class FlinkOperatorWebhook {
     public static void main(String[] args) throws Exception {
         LOG.info("Starting Flink Kubernetes Webhook");
         AdmissionHandler endpoint =
-                new AdmissionHandler(new FlinkValidator(new 
DefaultDeploymentValidator()));
+                new AdmissionHandler(new FlinkValidator(new 
DefaultValidator()));
         ChannelInitializer<SocketChannel> initializer = 
createChannelInitializer(endpoint);
         NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
         NioEventLoopGroup workerGroup = new NioEventLoopGroup();
diff --git 
a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkValidator.java
 
b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkValidator.java
index 838e618..e2dec7a 100644
--- 
a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkValidator.java
+++ 
b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkValidator.java
@@ -21,7 +21,7 @@ import 
org.apache.flink.kubernetes.operator.admission.admissioncontroller.NotAll
 import 
org.apache.flink.kubernetes.operator.admission.admissioncontroller.Operation;
 import 
org.apache.flink.kubernetes.operator.admission.admissioncontroller.validation.Validator;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import 
org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator;
+import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
@@ -35,9 +35,9 @@ public class FlinkValidator implements 
Validator<GenericKubernetesResource> {
     private static final Logger LOG = 
LoggerFactory.getLogger(FlinkValidator.class);
     private static final ObjectMapper objectMapper = new ObjectMapper();
 
-    private final FlinkDeploymentValidator deploymentValidator;
+    private final FlinkResourceValidator deploymentValidator;
 
-    public FlinkValidator(FlinkDeploymentValidator deploymentValidator) {
+    public FlinkValidator(FlinkResourceValidator deploymentValidator) {
         this.deploymentValidator = deploymentValidator;
     }
 
@@ -49,7 +49,7 @@ public class FlinkValidator implements 
Validator<GenericKubernetesResource> {
         FlinkDeployment flinkDeployment =
                 objectMapper.convertValue(resource, FlinkDeployment.class);
 
-        Optional<String> validationError = 
deploymentValidator.validate(flinkDeployment);
+        Optional<String> validationError = 
deploymentValidator.validateDeployment(flinkDeployment);
         if (validationError.isPresent()) {
             throw new NotAllowedException(validationError.get());
         }
diff --git 
a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java
 
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java
index fcf0d21..e41a620 100644
--- 
a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java
+++ 
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java
@@ -19,7 +19,7 @@ package org.apache.flink.kubernetes.operator.admission;
 
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
-import 
org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator;
+import org.apache.flink.kubernetes.operator.validation.DefaultValidator;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
 import 
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
@@ -48,7 +48,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class AdmissionHandlerTest {
 
     private final AdmissionHandler admissionHandler =
-            new AdmissionHandler(new FlinkValidator(new 
DefaultDeploymentValidator()));
+            new AdmissionHandler(new FlinkValidator(new DefaultValidator()));
 
     @Test
     public void testHandleIllegalRequest() {
diff --git 
a/helm/flink-kubernetes-operator/conf/flink-operator-config/log4j2.properties 
b/helm/flink-kubernetes-operator/conf/flink-operator-config/log4j2.properties
index 47b6664..8e3e456 100644
--- 
a/helm/flink-kubernetes-operator/conf/flink-operator-config/log4j2.properties
+++ 
b/helm/flink-kubernetes-operator/conf/flink-operator-config/log4j2.properties
@@ -23,4 +23,4 @@ rootLogger.appenderRef.console.ref = ConsoleAppender
 appender.console.name = ConsoleAppender
 appender.console.type = CONSOLE
 appender.console.layout.type = PatternLayout
-appender.console.layout.pattern = %style{%d}{yellow} %style{%-30c{1.}}{cyan} 
%highlight{[%-5level]%notEmpty{[%X{resource.namespace}/}%notEmpty{%X{resource.name}]}
 %msg%n%throwable}
+appender.console.layout.pattern = %style{%d}{yellow} %style{%-30c{1.}}{cyan} 
%highlight{[%-5level]%notEmpty{[%X{resource.namespace}/}%notEmpty{%X{resource.name}]}
 %msg%n%throwable}
\ No newline at end of file
diff --git 
a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml 
b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
index 6ea6bee..475449d 100644
--- 
a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
+++ 
b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
@@ -85,7 +85,7 @@ spec:
                     type: boolean
                   error:
                     type: string
-                  flinkSessionJobSpec:
+                  lastReconciledSpec:
                     properties:
                       clusterId:
                         type: string
diff --git a/helm/flink-kubernetes-operator/templates/flink-operator.yaml 
b/helm/flink-kubernetes-operator/templates/flink-operator.yaml
index 305ea4b..64bc5c8 100644
--- a/helm/flink-kubernetes-operator/templates/flink-operator.yaml
+++ b/helm/flink-kubernetes-operator/templates/flink-operator.yaml
@@ -74,6 +74,9 @@ spec:
               mountPath: /opt/flink-operator/conf
             - name: flink-default-config-volume
               mountPath: /opt/flink/conf
+            {{- if .Values.operatorVolumeMounts.create }}
+                {{- toYaml .Values.operatorVolumeMounts.data | nindent 12 }}
+            {{- end }}
         {{- if .Values.webhook.create }}
         - name: flink-webhook
           image: "{{ .Values.image.repository }}:{{ .Values.image.tag | 
default .Chart.AppVersion }}"
@@ -123,6 +126,9 @@ spec:
                 path: flink-conf.yaml
               - key: log4j-console.properties
                 path: log4j-console.properties
+        {{- if .Values.operatorVolumes.create }}
+              {{- toYaml .Values.operatorVolumes.data | nindent 8 }}
+        {{- end }}
         {{- if .Values.webhook.create }}
         - name: keystore
           secret:
diff --git a/helm/flink-kubernetes-operator/templates/rbac.yaml 
b/helm/flink-kubernetes-operator/templates/rbac.yaml
index 8fa96aa..57ee3f4 100644
--- a/helm/flink-kubernetes-operator/templates/rbac.yaml
+++ b/helm/flink-kubernetes-operator/templates/rbac.yaml
@@ -59,6 +59,8 @@ rules:
     resources:
       - flinkdeployments
       - flinkdeployments/status
+      - flinksessionjobs
+      - flinksessionjobs/status
     verbs:
       - "*"
   - apiGroups:
diff --git a/helm/flink-kubernetes-operator/values.yaml 
b/helm/flink-kubernetes-operator/values.yaml
index b634f83..976d61f 100644
--- a/helm/flink-kubernetes-operator/values.yaml
+++ b/helm/flink-kubernetes-operator/values.yaml
@@ -41,6 +41,23 @@ jobServiceAccount:
     "helm.sh/resource-policy": keep
   name: "flink"
 
+operatorVolumeMounts:
+  create: false
+  data:
+    - name: flink-artifacts
+      mountPath: /opt/flink/artifacts
+
+operatorVolumes:
+  create: false
+  data:
+    - name: flink-artifacts
+      hostPath:
+        path: /tmp/flink/artifacts
+        type: DirectoryOrCreate
+#    - name: flink-artifacts
+#      persistentVolumeClaim:
+#        claimName: flink-artifacts
+
 webhook:
   create: true
   keystore:

Reply via email to