This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new ae2f75a  [FLINK-26892] Observe current status before validating CR 
changes
ae2f75a is described below

commit ae2f75aea2be379190969e990640dfbb049fc346
Author: Biao Geng <[email protected]>
AuthorDate: Sat Apr 2 01:35:33 2022 +0800

    [FLINK-26892] Observe current status before validating CR changes
---
 .../flink/kubernetes/operator/FlinkOperator.java   |  4 +-
 .../flink/kubernetes/operator/config/Mode.java     | 22 +++++-
 .../controller/FlinkDeploymentController.java      | 31 ++++----
 .../kubernetes/operator/observer/BaseObserver.java | 38 ++++++++-
 .../kubernetes/operator/observer/JobObserver.java  | 32 ++++----
 .../kubernetes/operator/observer/Observer.java     |  4 +-
 .../operator/observer/ObserverFactory.java         | 15 ++--
 .../operator/observer/SessionObserver.java         | 31 ++++----
 .../operator/utils/FlinkConfigBuilder.java         | 24 ++++--
 .../kubernetes/operator/utils/FlinkUtils.java      |  9 ++-
 .../kubernetes/operator/TestingFlinkService.java   |  7 ++
 .../controller/FlinkDeploymentControllerTest.java  | 91 +++++++++++++++++++++-
 .../operator/observer/JobObserverTest.java         | 50 ++++++------
 .../operator/observer/SessionObserverTest.java     | 38 ++++-----
 14 files changed, 283 insertions(+), 113 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 2ceeafb..02757a6 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -76,7 +76,9 @@ public class FlinkOperator {
         FlinkDeploymentValidator validator = new DefaultDeploymentValidator();
         ReconcilerFactory reconcilerFactory =
                 new ReconcilerFactory(client, flinkService, 
operatorConfiguration);
-        ObserverFactory observerFactory = new ObserverFactory(flinkService, 
operatorConfiguration);
+        ObserverFactory observerFactory =
+                new ObserverFactory(
+                        flinkService, operatorConfiguration, 
defaultConfig.getFlinkConfig());
 
         controller =
                 new FlinkDeploymentController(
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java
index e616432..1e62b34 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java
@@ -19,13 +19,33 @@
 package org.apache.flink.kubernetes.operator.config;
 
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 
 /** The mode of {@link FlinkDeployment}. */
 public enum Mode {
     APPLICATION,
     SESSION;
 
+    /**
+     * Return the mode of the given FlinkDeployment for Observer and 
Reconciler. Note, switching
+     * mode for an existing deployment is not allowed.
+     *
+     * @param flinkApp given FlinkDeployment
+     * @return Mode
+     */
     public static Mode getMode(FlinkDeployment flinkApp) {
-        return flinkApp.getSpec().getJob() != null ? APPLICATION : SESSION;
+        // Try to use lastReconciledSpec if it exists.
+        // The mode derived from last-reconciled spec or current spec should 
be same.
+        // If they are different, observation phase will use last-reconciled 
spec and validation
+        // phase will fail.
+        FlinkDeploymentSpec lastReconciledSpec =
+                
flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec();
+        return lastReconciledSpec == null
+                ? getMode(flinkApp.getSpec())
+                : getMode(lastReconciledSpec);
+    }
+
+    private static Mode getMode(FlinkDeploymentSpec spec) {
+        return spec.getJob() != null ? APPLICATION : SESSION;
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index f82b30f..0067674 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -92,34 +92,31 @@ public class FlinkDeploymentController
     @Override
     public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
         LOG.info("Deleting FlinkDeployment");
-        Configuration effectiveConfig =
-                FlinkUtils.getEffectiveConfig(flinkApp, 
defaultConfig.getFlinkConfig());
         try {
-            observerFactory.getOrCreate(flinkApp).observe(flinkApp, context, 
effectiveConfig);
+            observerFactory.getOrCreate(flinkApp).observe(flinkApp, context);
         } catch (DeploymentFailedException dfe) {
             // ignore during cleanup
         }
+        Configuration effectiveConfig =
+                FlinkUtils.getEffectiveConfig(flinkApp, 
defaultConfig.getFlinkConfig());
         return reconcilerFactory.getOrCreate(flinkApp).cleanup(flinkApp, 
effectiveConfig);
     }
 
     @Override
     public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, 
Context context) {
-        FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
         LOG.info("Starting reconciliation");
-
-        Optional<String> validationError = validator.validate(flinkApp);
-        if (validationError.isPresent()) {
-            LOG.error("Validation failed: " + validationError.get());
-            ReconciliationUtils.updateForReconciliationError(flinkApp, 
validationError.get());
-            return ReconciliationUtils.toUpdateControl(
-                    operatorConfiguration, originalCopy, flinkApp, false);
-        }
-
-        Configuration effectiveConfig =
-                FlinkUtils.getEffectiveConfig(flinkApp, 
defaultConfig.getFlinkConfig());
-
+        FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
         try {
-            observerFactory.getOrCreate(flinkApp).observe(flinkApp, context, 
effectiveConfig);
+            observerFactory.getOrCreate(flinkApp).observe(flinkApp, context);
+            Optional<String> validationError = validator.validate(flinkApp);
+            if (validationError.isPresent()) {
+                LOG.error("Validation failed: " + validationError.get());
+                ReconciliationUtils.updateForReconciliationError(flinkApp, 
validationError.get());
+                return ReconciliationUtils.toUpdateControl(
+                        operatorConfiguration, originalCopy, flinkApp, false);
+            }
+            Configuration effectiveConfig =
+                    FlinkUtils.getEffectiveConfig(flinkApp, 
defaultConfig.getFlinkConfig());
             reconcilerFactory.getOrCreate(flinkApp).reconcile(flinkApp, 
context, effectiveConfig);
         } catch (DeploymentFailedException dfe) {
             handleDeploymentFailed(flinkApp, dfe);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
index 676074b..4afc227 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 
 import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
 import io.fabric8.kubernetes.api.model.ContainerStatus;
@@ -53,11 +54,15 @@ public abstract class BaseObserver implements Observer {
 
     protected final FlinkService flinkService;
     protected final FlinkOperatorConfiguration operatorConfiguration;
+    protected final Configuration flinkConfig;
 
     public BaseObserver(
-            FlinkService flinkService, FlinkOperatorConfiguration 
operatorConfiguration) {
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration,
+            Configuration flinkConfig) {
         this.flinkService = flinkService;
         this.operatorConfiguration = operatorConfiguration;
+        this.flinkConfig = flinkConfig;
     }
 
     protected void observeJmDeployment(
@@ -182,4 +187,35 @@ public abstract class BaseObserver implements Observer {
                 && lastReconciledSpec != null
                 && lastReconciledSpec.getJob().getState() == 
JobState.SUSPENDED;
     }
+
+    public void observe(FlinkDeployment flinkApp, Context context) {
+        FlinkDeploymentSpec lastReconciledSpec =
+                
flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec();
+        // Nothing has been launched so skip observing
+        if (lastReconciledSpec == null) {
+            return;
+        }
+
+        Configuration lastValidatedConfig =
+                FlinkUtils.getEffectiveConfig(
+                        flinkApp.getMetadata(), lastReconciledSpec, 
this.flinkConfig);
+        if (!isClusterReady(flinkApp)) {
+            observeJmDeployment(flinkApp, context, lastValidatedConfig);
+        }
+        if (isClusterReady(flinkApp)) {
+            observeIfClusterReady(flinkApp, context, lastValidatedConfig);
+        }
+        clearErrorsIfJobManagerDeploymentNotInErrorStatus(flinkApp);
+    }
+
+    /**
+     * Observe the flinkApp status when the cluster is ready. It will be 
implemented by child class
+     * to reflect the changed status on the flinkApp resource.
+     *
+     * @param flinkApp the target flinkDeployment resource
+     * @param context the context with which the operation is executed
+     * @param lastValidatedConfig the last validated config
+     */
+    public abstract void observeIfClusterReady(
+            FlinkDeployment flinkApp, Context context, Configuration 
lastValidatedConfig);
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java
index 9ecab3e..1c60a87 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java
@@ -41,38 +41,35 @@ import java.util.concurrent.TimeoutException;
 public class JobObserver extends BaseObserver {
 
     public JobObserver(
-            FlinkService flinkService, FlinkOperatorConfiguration 
operatorConfiguration) {
-        super(flinkService, operatorConfiguration);
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration,
+            Configuration flinkConfig) {
+        super(flinkService, operatorConfiguration, flinkConfig);
     }
 
     @Override
-    public void observe(FlinkDeployment flinkApp, Context context, 
Configuration effectiveConfig) {
-        if (!isClusterReady(flinkApp)) {
-            observeJmDeployment(flinkApp, context, effectiveConfig);
+    public void observeIfClusterReady(
+            FlinkDeployment flinkApp, Context context, Configuration 
lastValidatedConfig) {
+        boolean jobFound = observeFlinkJobStatus(flinkApp, context, 
lastValidatedConfig);
+        if (jobFound) {
+            observeSavepointStatus(flinkApp, lastValidatedConfig);
         }
-        if (isClusterReady(flinkApp)) {
-            boolean jobFound = observeFlinkJobStatus(flinkApp, context, 
effectiveConfig);
-            if (jobFound) {
-                observeSavepointStatus(flinkApp, effectiveConfig);
-            }
-        }
-        clearErrorsIfJobManagerDeploymentNotInErrorStatus(flinkApp);
     }
 
     private boolean observeFlinkJobStatus(
-            FlinkDeployment flinkApp, Context context, Configuration 
effectiveConfig) {
+            FlinkDeployment flinkApp, Context context, Configuration 
lastValidatedConfig) {
         logger.info("Observing job status");
         FlinkDeploymentStatus flinkAppStatus = flinkApp.getStatus();
         String previousJobStatus = flinkAppStatus.getJobStatus().getState();
         Collection<JobStatusMessage> clusterJobStatuses;
         try {
-            clusterJobStatuses = flinkService.listJobs(effectiveConfig);
+            clusterJobStatuses = flinkService.listJobs(lastValidatedConfig);
         } catch (Exception e) {
             logger.error("Exception while listing jobs", e);
             flinkAppStatus.getJobStatus().setState(JOB_STATE_UNKNOWN);
             if (e instanceof TimeoutException) {
                 // check for problems with the underlying deployment
-                observeJmDeployment(flinkApp, context, effectiveConfig);
+                observeJmDeployment(flinkApp, context, lastValidatedConfig);
             }
             return false;
         }
@@ -111,7 +108,8 @@ public class JobObserver extends BaseObserver {
         return status.getState();
     }
 
-    private void observeSavepointStatus(FlinkDeployment flinkApp, 
Configuration effectiveConfig) {
+    private void observeSavepointStatus(
+            FlinkDeployment flinkApp, Configuration lastValidatedConfig) {
         SavepointInfo savepointInfo = 
flinkApp.getStatus().getJobStatus().getSavepointInfo();
         if (!SavepointUtils.savepointInProgress(flinkApp)) {
             logger.debug("Savepoint not in progress");
@@ -121,7 +119,7 @@ public class JobObserver extends BaseObserver {
 
         SavepointFetchResult savepointFetchResult;
         try {
-            savepointFetchResult = flinkService.fetchSavepointInfo(flinkApp, 
effectiveConfig);
+            savepointFetchResult = flinkService.fetchSavepointInfo(flinkApp, 
lastValidatedConfig);
         } catch (Exception e) {
             logger.error("Exception while fetching savepoint info", e);
             return;
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
index e354689..1f7e16e 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.kubernetes.operator.observer;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 
 import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -30,7 +29,6 @@ public interface Observer {
      *
      * @param flinkApp the target flinkDeployment resource
      * @param context the context with which the operation is executed
-     * @param effectiveConfig the effective config of the flinkApp
      */
-    void observe(FlinkDeployment flinkApp, Context context, Configuration 
effectiveConfig);
+    void observe(FlinkDeployment flinkApp, Context context);
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ObserverFactory.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ObserverFactory.java
index d3ec8b9..a0a3566 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ObserverFactory.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ObserverFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.kubernetes.operator.observer;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.config.Mode;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
@@ -30,13 +31,17 @@ import java.util.concurrent.ConcurrentHashMap;
 public class ObserverFactory {
 
     private final FlinkService flinkService;
-    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final FlinkOperatorConfiguration operatorConfig;
+    private final Configuration flinkConfig;
     private final Map<Mode, Observer> observerMap;
 
     public ObserverFactory(
-            FlinkService flinkService, FlinkOperatorConfiguration 
operatorConfiguration) {
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration,
+            Configuration flinkConfig) {
         this.flinkService = flinkService;
-        this.operatorConfiguration = operatorConfiguration;
+        this.operatorConfig = operatorConfiguration;
+        this.flinkConfig = flinkConfig;
         this.observerMap = new ConcurrentHashMap<>();
     }
 
@@ -46,9 +51,9 @@ public class ObserverFactory {
                 mode -> {
                     switch (mode) {
                         case SESSION:
-                            return new SessionObserver(flinkService, 
operatorConfiguration);
+                            return new SessionObserver(flinkService, 
operatorConfig, flinkConfig);
                         case APPLICATION:
-                            return new JobObserver(flinkService, 
operatorConfiguration);
+                            return new JobObserver(flinkService, 
operatorConfig, flinkConfig);
                         default:
                             throw new UnsupportedOperationException(
                                     String.format("Unsupported running mode: 
%s", mode));
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java
index 80a4fbf..f74af3b 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java
@@ -31,27 +31,24 @@ import java.util.concurrent.TimeoutException;
 public class SessionObserver extends BaseObserver {
 
     public SessionObserver(
-            FlinkService flinkService, FlinkOperatorConfiguration 
operatorConfiguration) {
-        super(flinkService, operatorConfiguration);
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration,
+            Configuration flinkConfig) {
+        super(flinkService, operatorConfiguration, flinkConfig);
     }
 
     @Override
-    public void observe(FlinkDeployment flinkApp, Context context, 
Configuration effectiveConfig) {
-        if (!isClusterReady(flinkApp)) {
-            observeJmDeployment(flinkApp, context, effectiveConfig);
-        }
-        if (isClusterReady(flinkApp)) {
-            // Check if session cluster can serve rest calls following our 
practice in JobObserver
-            try {
-                flinkService.listJobs(effectiveConfig);
-            } catch (Exception e) {
-                logger.error("REST service in session cluster is bad now", e);
-                if (e instanceof TimeoutException) {
-                    // check for problems with the underlying deployment
-                    observeJmDeployment(flinkApp, context, effectiveConfig);
-                }
+    public void observeIfClusterReady(
+            FlinkDeployment flinkApp, Context context, Configuration 
lastValidatedConfig) {
+        // Check if session cluster can serve rest calls following our 
practice in JobObserver
+        try {
+            flinkService.listJobs(lastValidatedConfig);
+        } catch (Exception e) {
+            logger.error("REST service in session cluster is bad now", e);
+            if (e instanceof TimeoutException) {
+                // check for problems with the underlying deployment
+                observeJmDeployment(flinkApp, context, lastValidatedConfig);
             }
         }
-        clearErrorsIfJobManagerDeploymentNotInErrorStatus(flinkApp);
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
index 2faa76e..cc59838 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.Resource;
 import org.apache.flink.util.StringUtils;
 
+import io.fabric8.kubernetes.api.model.ObjectMeta;
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.client.internal.SerializationUtils;
 
@@ -50,13 +51,18 @@ import static 
org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOGBACK_NA
 
 /** Builder to get effective flink config from {@link FlinkDeployment}. */
 public class FlinkConfigBuilder {
-    private final FlinkDeployment deploy;
+    private final ObjectMeta meta;
     private final FlinkDeploymentSpec spec;
     private final Configuration effectiveConfig;
 
     public FlinkConfigBuilder(FlinkDeployment deploy, Configuration 
flinkConfig) {
-        this.deploy = deploy;
-        this.spec = this.deploy.getSpec();
+        this(deploy.getMetadata(), deploy.getSpec(), flinkConfig);
+    }
+
+    public FlinkConfigBuilder(
+            ObjectMeta metadata, FlinkDeploymentSpec spec, Configuration 
flinkConfig) {
+        this.meta = metadata;
+        this.spec = spec;
         this.effectiveConfig = new Configuration(flinkConfig);
     }
 
@@ -179,8 +185,8 @@ public class FlinkConfigBuilder {
     public Configuration build() {
 
         // Set cluster config
-        final String namespace = deploy.getMetadata().getNamespace();
-        final String clusterId = deploy.getMetadata().getName();
+        final String namespace = meta.getNamespace();
+        final String clusterId = meta.getName();
         effectiveConfig.setString(KubernetesConfigOptions.NAMESPACE, 
namespace);
         effectiveConfig.setString(KubernetesConfigOptions.CLUSTER_ID, 
clusterId);
         return effectiveConfig;
@@ -188,7 +194,13 @@ public class FlinkConfigBuilder {
 
     public static Configuration buildFrom(FlinkDeployment dep, Configuration 
flinkConfig)
             throws IOException, URISyntaxException {
-        return new FlinkConfigBuilder(dep, flinkConfig)
+        return buildFrom(dep.getMetadata(), dep.getSpec(), flinkConfig);
+    }
+
+    public static Configuration buildFrom(
+            ObjectMeta meta, FlinkDeploymentSpec spec, Configuration 
flinkConfig)
+            throws IOException, URISyntaxException {
+        return new FlinkConfigBuilder(meta, spec, flinkConfig)
                 .applyFlinkConfiguration()
                 .applyLogConfiguration()
                 .applyImage()
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index 8068802..1d89036 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
 import 
org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
 import org.apache.flink.kubernetes.operator.config.DefaultConfig;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
 
@@ -33,6 +34,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import io.fabric8.kubernetes.api.model.ConfigMapList;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.PodList;
 import io.fabric8.kubernetes.api.model.Service;
@@ -61,9 +63,14 @@ public class FlinkUtils {
 
     public static Configuration getEffectiveConfig(
             FlinkDeployment flinkApp, Configuration flinkConfig) {
+        return getEffectiveConfig(flinkApp.getMetadata(), flinkApp.getSpec(), 
flinkConfig);
+    }
+
+    public static Configuration getEffectiveConfig(
+            ObjectMeta meta, FlinkDeploymentSpec spec, Configuration 
flinkConfig) {
         try {
             final Configuration effectiveConfig =
-                    FlinkConfigBuilder.buildFrom(flinkApp, flinkConfig);
+                    FlinkConfigBuilder.buildFrom(meta, spec, flinkConfig);
             LOG.debug("Effective config: {}", effectiveConfig);
             return effectiveConfig;
         } catch (Exception e) {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index d6d6d6d..214bd03 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -38,6 +38,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 /** Flink service mock for tests. */
@@ -50,6 +51,7 @@ public class TestingFlinkService extends FlinkService {
     private Set<String> sessions = new HashSet<>();
     private boolean isPortReady = true;
     private PodList podList = new PodList();
+    private Consumer<Configuration> listJobConsumer = conf -> {};
 
     public TestingFlinkService() {
         super(null, null);
@@ -80,6 +82,7 @@ public class TestingFlinkService extends FlinkService {
 
     @Override
     public List<JobStatusMessage> listJobs(Configuration conf) throws 
Exception {
+        listJobConsumer.accept(conf);
         if (jobs.isEmpty() && !sessions.isEmpty()) {
             throw new Exception("Trying to list a job without submitting it");
         }
@@ -89,6 +92,10 @@ public class TestingFlinkService extends FlinkService {
         return jobs.stream().map(t -> t.f1).collect(Collectors.toList());
     }
 
+    public void setListJobConsumer(Consumer<Configuration> listJobConsumer) {
+        this.listJobConsumer = listJobConsumer;
+    }
+
     public List<Tuple2<String, JobStatusMessage>> listJobs() {
         return new ArrayList<>(jobs);
     }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 71348c7..3b93713 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -20,8 +20,10 @@ package org.apache.flink.kubernetes.operator.controller;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.config.DefaultConfig;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
@@ -29,6 +31,7 @@ import 
org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
+import org.apache.flink.kubernetes.operator.observer.BaseObserver;
 import 
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.observer.ObserverFactory;
 import org.apache.flink.kubernetes.operator.reconciler.ReconcilerFactory;
@@ -72,6 +75,7 @@ public class FlinkDeploymentControllerTest {
             FlinkOperatorConfiguration.fromConfiguration(new Configuration());
 
     private TestingFlinkService flinkService;
+    private DefaultConfig defaultConfig;
     private FlinkDeploymentController testController;
 
     private KubernetesMockServer mockServer;
@@ -80,6 +84,7 @@ public class FlinkDeploymentControllerTest {
     @BeforeEach
     public void setup() {
         flinkService = new TestingFlinkService();
+        defaultConfig = FlinkUtils.loadDefaultConfig();
         testController = createTestController(kubernetesClient, flinkService);
     }
 
@@ -397,7 +402,84 @@ public class FlinkDeploymentControllerTest {
         testUpgradeNotReadyCluster(appCluster, false);
     }
 
-    public void testUpgradeNotReadyCluster(FlinkDeployment appCluster, boolean 
allowUpgrade) {
+    @Test
+    public void verifyReconcileWithBadConfig() {
+
+        FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
+        UpdateControl<FlinkDeployment> updateControl;
+        // Override rest port, and it should be saved in lastReconciledSpec 
once a successful
+        // reconcile() finishes.
+        
appCluster.getSpec().getFlinkConfiguration().put(RestOptions.PORT.key(), 
"8088");
+        updateControl = testController.reconcile(appCluster, context);
+        assertTrue(updateControl.isUpdateStatus());
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYING,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+
+        // Check when the bad config is applied, observe() will change the 
cluster state correctly
+        appCluster.getSpec().getJobManager().setReplicas(-1);
+        // Next reconcile will set error msg and observe with previous 
validated config
+        updateControl = testController.reconcile(appCluster, context);
+        assertEquals(
+                "JobManager replicas should not be configured less than one.",
+                appCluster.getStatus().getReconciliationStatus().getError());
+        assertTrue(updateControl.isUpdateStatus());
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+
+        // Make sure we do validation before getting effective config in 
reconcile().
+        appCluster.getSpec().getJobManager().setReplicas(1);
+        appCluster.getSpec().getJob().setJarURI(null);
+        // Verify the saved rest port in lastReconciledSpec is actually used 
in observe() by
+        // utilizing listJobConsumer
+        
appCluster.getSpec().getFlinkConfiguration().put(RestOptions.PORT.key(), 
"12345");
+        flinkService.setListJobConsumer(
+                (configuration) -> assertEquals(8088, 
configuration.get(RestOptions.PORT)));
+        testController.reconcile(appCluster, context);
+        assertEquals(
+                JobManagerDeploymentStatus.READY,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+    }
+
+    @Test
+    public void verifyReconcileWithAChangedOperatorMode() {
+
+        FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
+        UpdateControl<FlinkDeployment> updateControl;
+
+        updateControl = testController.reconcile(appCluster, context);
+        assertTrue(updateControl.isUpdateStatus());
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYING,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+
+        updateControl = testController.reconcile(appCluster, context);
+        JobStatus jobStatus = appCluster.getStatus().getJobStatus();
+        assertTrue(updateControl.isUpdateStatus());
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+        // jobStatus has not been set at this time
+        assertEquals(BaseObserver.JOB_STATE_UNKNOWN, jobStatus.getState());
+
+        // Switches operator mode to SESSION
+        appCluster.getSpec().setJob(null);
+        // Validation fails and JobObserver should still be used
+        updateControl = testController.reconcile(appCluster, context);
+        assertTrue(updateControl.isUpdateStatus());
+        assertEquals(
+                JobManagerDeploymentStatus.READY,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+        // Verify jobStatus is running
+        jobStatus = appCluster.getStatus().getJobStatus();
+        JobStatusMessage expectedJobStatus = flinkService.listJobs().get(0).f1;
+        assertEquals(expectedJobStatus.getJobId().toHexString(), 
jobStatus.getJobId());
+        assertEquals(expectedJobStatus.getJobName(), jobStatus.getJobName());
+        assertEquals(expectedJobStatus.getJobState().toString(), 
jobStatus.getState());
+    }
+
+    private void testUpgradeNotReadyCluster(FlinkDeployment appCluster, 
boolean allowUpgrade) {
         testController.reconcile(appCluster, TestUtils.createEmptyContext());
         assertEquals(
                 appCluster.getSpec(),
@@ -522,14 +604,17 @@ public class FlinkDeploymentControllerTest {
 
         FlinkDeploymentController controller =
                 new FlinkDeploymentController(
-                        FlinkUtils.loadDefaultConfig(),
+                        defaultConfig,
                         operatorConfiguration,
                         kubernetesClient,
                         "test",
                         new DefaultDeploymentValidator(),
                         new ReconcilerFactory(
                                 kubernetesClient, flinkService, 
operatorConfiguration),
-                        new ObserverFactory(flinkService, 
operatorConfiguration));
+                        new ObserverFactory(
+                                flinkService,
+                                operatorConfiguration,
+                                defaultConfig.getFlinkConfig()));
         controller.setControllerConfig(
                 new FlinkControllerConfig(controller, Collections.emptySet()));
         return controller;
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
index b7aed87..8ea1063 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
@@ -44,15 +44,17 @@ public class JobObserverTest {
 
     @Test
     public void observeApplicationCluster() {
+        Configuration flinkConf = new Configuration();
         TestingFlinkService flinkService = new TestingFlinkService();
         JobObserver observer =
                 new JobObserver(
                         flinkService,
-                        FlinkOperatorConfiguration.fromConfiguration(new 
Configuration()));
+                        FlinkOperatorConfiguration.fromConfiguration(new 
Configuration()),
+                        flinkConf);
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
-        Configuration conf = FlinkUtils.getEffectiveConfig(deployment, new 
Configuration());
+        Configuration conf = FlinkUtils.getEffectiveConfig(deployment, 
flinkConf);
 
-        observer.observe(deployment, TestUtils.createEmptyContext(), conf);
+        observer.observe(deployment, TestUtils.createEmptyContext());
 
         deployment.setStatus(new FlinkDeploymentStatus());
         deployment
@@ -66,31 +68,31 @@ public class JobObserverTest {
         flinkService.setPortReady(false);
 
         // Port not ready
-        observer.observe(deployment, readyContext, conf);
+        observer.observe(deployment, readyContext);
         assertEquals(
                 JobManagerDeploymentStatus.DEPLOYING,
                 deployment.getStatus().getJobManagerDeploymentStatus());
 
-        observer.observe(deployment, readyContext, conf);
+        observer.observe(deployment, readyContext);
         assertEquals(
                 JobManagerDeploymentStatus.DEPLOYING,
                 deployment.getStatus().getJobManagerDeploymentStatus());
 
         flinkService.setPortReady(true);
         // Port ready but we have to recheck once again
-        observer.observe(deployment, readyContext, conf);
+        observer.observe(deployment, readyContext);
         assertEquals(
                 JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
                 deployment.getStatus().getJobManagerDeploymentStatus());
 
         // Stable ready
-        observer.observe(deployment, readyContext, conf);
+        observer.observe(deployment, readyContext);
         assertEquals(
                 JobManagerDeploymentStatus.READY,
                 deployment.getStatus().getJobManagerDeploymentStatus());
         assertEquals(JobState.RUNNING.name(), 
deployment.getStatus().getJobStatus().getState());
 
-        observer.observe(deployment, readyContext, conf);
+        observer.observe(deployment, readyContext);
         assertEquals(
                 JobManagerDeploymentStatus.READY,
                 deployment.getStatus().getJobManagerDeploymentStatus());
@@ -110,24 +112,24 @@ public class JobObserverTest {
                         >= 0);
         // Test job manager is unavailable suddenly
         flinkService.setPortReady(false);
-        observer.observe(deployment, readyContext, conf);
+        observer.observe(deployment, readyContext);
         assertEquals(
                 JobManagerDeploymentStatus.DEPLOYING,
                 deployment.getStatus().getJobManagerDeploymentStatus());
         // Job manager recovers
         flinkService.setPortReady(true);
-        observer.observe(deployment, readyContext, conf);
+        observer.observe(deployment, readyContext);
         assertEquals(
                 JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
                 deployment.getStatus().getJobManagerDeploymentStatus());
-        observer.observe(deployment, readyContext, conf);
+        observer.observe(deployment, readyContext);
         assertEquals(
                 JobManagerDeploymentStatus.READY,
                 deployment.getStatus().getJobManagerDeploymentStatus());
 
         // Test listing failure
         flinkService.clear();
-        observer.observe(deployment, readyContext, conf);
+        observer.observe(deployment, readyContext);
         assertEquals(
                 JobManagerDeploymentStatus.READY,
                 deployment.getStatus().getJobManagerDeploymentStatus());
@@ -137,16 +139,18 @@ public class JobObserverTest {
 
     @Test
     public void observeSavepoint() throws Exception {
+        Configuration flinkConf = new Configuration();
         TestingFlinkService flinkService = new TestingFlinkService();
         JobObserver observer =
                 new JobObserver(
                         flinkService,
-                        FlinkOperatorConfiguration.fromConfiguration(new 
Configuration()));
+                        FlinkOperatorConfiguration.fromConfiguration(new 
Configuration()),
+                        flinkConf);
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
-        Configuration conf = FlinkUtils.getEffectiveConfig(deployment, new 
Configuration());
+        Configuration conf = FlinkUtils.getEffectiveConfig(deployment, 
flinkConf);
         flinkService.submitApplicationCluster(deployment, conf);
         bringToReadyStatus(deployment);
-        observer.observe(deployment, readyContext, conf);
+        observer.observe(deployment, readyContext);
         assertEquals(
                 JobManagerDeploymentStatus.READY,
                 deployment.getStatus().getJobManagerDeploymentStatus());
@@ -155,7 +159,7 @@ public class JobObserverTest {
         assertEquals(
                 "trigger_0",
                 
deployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
-        observer.observe(deployment, readyContext, conf);
+        observer.observe(deployment, readyContext);
         assertEquals(
                 "savepoint_0",
                 deployment
@@ -171,7 +175,7 @@ public class JobObserverTest {
         assertEquals(
                 "trigger_1",
                 
deployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
-        observer.observe(deployment, readyContext, conf);
+        observer.observe(deployment, readyContext);
         assertEquals(
                 "savepoint_1",
                 deployment
@@ -199,15 +203,17 @@ public class JobObserverTest {
 
     @Test
     public void observeListJobsError() {
+        Configuration flinkConf = new Configuration();
         TestingFlinkService flinkService = new TestingFlinkService();
         JobObserver observer =
                 new JobObserver(
                         flinkService,
-                        FlinkOperatorConfiguration.fromConfiguration(new 
Configuration()));
+                        FlinkOperatorConfiguration.fromConfiguration(new 
Configuration()),
+                        flinkConf);
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
-        Configuration conf = FlinkUtils.getEffectiveConfig(deployment, new 
Configuration());
+        Configuration conf = FlinkUtils.getEffectiveConfig(deployment, 
flinkConf);
         bringToReadyStatus(deployment);
-        observer.observe(deployment, readyContext, conf);
+        observer.observe(deployment, readyContext);
         assertEquals(
                 JobManagerDeploymentStatus.READY,
                 deployment.getStatus().getJobManagerDeploymentStatus());
@@ -220,9 +226,7 @@ public class JobObserverTest {
                         DeploymentFailedException.class,
                         () -> {
                             observer.observe(
-                                    deployment,
-                                    
TestUtils.createContextWithInProgressDeployment(),
-                                    conf);
+                                    deployment, 
TestUtils.createContextWithInProgressDeployment());
                         });
         assertEquals(podFailedMessage, exception.getMessage());
     }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
index 3f154c2..003195e 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
@@ -47,30 +47,32 @@ public class SessionObserverTest {
     @Test
     public void observeSessionCluster() {
         TestingFlinkService flinkService = new TestingFlinkService();
+        FlinkDeployment deployment = TestUtils.buildSessionCluster();
+        Configuration flinkConf = new Configuration();
         SessionObserver observer =
                 new SessionObserver(
                         flinkService,
-                        FlinkOperatorConfiguration.fromConfiguration(new 
Configuration()));
-        FlinkDeployment deployment = TestUtils.buildSessionCluster();
-        Configuration conf = FlinkUtils.getEffectiveConfig(deployment, new 
Configuration());
+                        FlinkOperatorConfiguration.fromConfiguration(new 
Configuration()),
+                        flinkConf);
+        Configuration conf = FlinkUtils.getEffectiveConfig(deployment, 
flinkConf);
         deployment
                 .getStatus()
                 .getReconciliationStatus()
                 .setLastReconciledSpec(deployment.getSpec());
 
-        observer.observe(deployment, readyContext, conf);
+        observer.observe(deployment, readyContext);
 
         assertEquals(
                 JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
                 deployment.getStatus().getJobManagerDeploymentStatus());
 
-        observer.observe(deployment, readyContext, conf);
+        observer.observe(deployment, readyContext);
 
         assertEquals(
                 JobManagerDeploymentStatus.READY,
                 deployment.getStatus().getJobManagerDeploymentStatus());
         // Observe again, the JM should be READY
-        observer.observe(deployment, readyContext, conf);
+        observer.observe(deployment, readyContext);
 
         assertEquals(
                 JobManagerDeploymentStatus.READY,
@@ -78,18 +80,18 @@ public class SessionObserverTest {
 
         // Test job manager is unavailable suddenly
         flinkService.setPortReady(false);
-        observer.observe(deployment, readyContext, conf);
+        observer.observe(deployment, readyContext);
 
         assertEquals(
                 JobManagerDeploymentStatus.DEPLOYING,
                 deployment.getStatus().getJobManagerDeploymentStatus());
         // Job manager recovers
         flinkService.setPortReady(true);
-        observer.observe(deployment, readyContext, conf);
+        observer.observe(deployment, readyContext);
         assertEquals(
                 JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
                 deployment.getStatus().getJobManagerDeploymentStatus());
-        observer.observe(deployment, readyContext, conf);
+        observer.observe(deployment, readyContext);
         assertEquals(
                 JobManagerDeploymentStatus.READY,
                 deployment.getStatus().getJobManagerDeploymentStatus());
@@ -97,6 +99,7 @@ public class SessionObserverTest {
 
     @Test
     public void testWatchMultipleNamespaces() {
+        Configuration flinkConf = new Configuration();
         FlinkService flinkService = new TestingFlinkService();
         FlinkDeployment deployment = TestUtils.buildSessionCluster();
         deployment
@@ -118,7 +121,7 @@ public class SessionObserverTest {
         k8sDeployment.setStatus(new DeploymentStatus());
 
         AtomicInteger secondaryResourceAccessed = new AtomicInteger(0);
-        Observer allNsObserver = new SessionObserver(flinkService, 
allNsConfig);
+        Observer allNsObserver = new SessionObserver(flinkService, 
allNsConfig, flinkConf);
         allNsObserver.observe(
                 deployment,
                 new Context() {
@@ -133,12 +136,12 @@ public class SessionObserverTest {
                         secondaryResourceAccessed.addAndGet(1);
                         return Optional.of((T) k8sDeployment);
                     }
-                },
-                FlinkUtils.getEffectiveConfig(deployment, new 
Configuration()));
+                });
 
         assertEquals(1, secondaryResourceAccessed.get());
 
-        Observer specificNsObserver = new SessionObserver(flinkService, 
specificNsConfig);
+        Observer specificNsObserver =
+                new SessionObserver(flinkService, specificNsConfig, flinkConf);
         specificNsObserver.observe(
                 deployment,
                 new Context() {
@@ -153,12 +156,12 @@ public class SessionObserverTest {
                         secondaryResourceAccessed.addAndGet(1);
                         return Optional.of((T) k8sDeployment);
                     }
-                },
-                FlinkUtils.getEffectiveConfig(deployment, new 
Configuration()));
+                });
 
         assertEquals(2, secondaryResourceAccessed.get());
 
-        Observer multipleNsObserver = new SessionObserver(flinkService, 
multipleNsConfig);
+        Observer multipleNsObserver =
+                new SessionObserver(flinkService, multipleNsConfig, flinkConf);
         multipleNsObserver.observe(
                 deployment,
                 new Context() {
@@ -173,8 +176,7 @@ public class SessionObserverTest {
                         secondaryResourceAccessed.addAndGet(1);
                         return Optional.of((T) k8sDeployment);
                     }
-                },
-                FlinkUtils.getEffectiveConfig(deployment, new 
Configuration()));
+                });
 
         assertEquals(3, secondaryResourceAccessed.get());
     }

Reply via email to