gyfora commented on a change in pull request #131:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/131#discussion_r839642805



##########
File path: 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
##########
@@ -92,34 +92,33 @@ public FlinkDeploymentController(
     @Override
     public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
         LOG.info("Deleting FlinkDeployment");
-        Configuration effectiveConfig =
-                FlinkUtils.getEffectiveConfig(flinkApp, 
defaultConfig.getFlinkConfig());
         try {
-            observerFactory.getOrCreate(flinkApp).observe(flinkApp, context, 
effectiveConfig);
+            observerFactory.getOrCreate(flinkApp).observe(flinkApp, context);
         } catch (DeploymentFailedException dfe) {
             // ignore during cleanup
         }
+        Configuration effectiveConfig =
+                FlinkUtils.getEffectiveConfig(flinkApp, 
defaultConfig.getFlinkConfig());
         return reconcilerFactory.getOrCreate(flinkApp).cleanup(flinkApp, 
effectiveConfig);
     }
 
     @Override
     public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, 
Context context) {
-        FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
         LOG.info("Starting reconciliation");
-
-        Optional<String> validationError = validator.validate(flinkApp);
-        if (validationError.isPresent()) {
-            LOG.error("Validation failed: " + validationError.get());
-            ReconciliationUtils.updateForReconciliationError(flinkApp, 
validationError.get());
-            return ReconciliationUtils.toUpdateControl(
-                    operatorConfiguration, originalCopy, flinkApp, false);
-        }
-
-        Configuration effectiveConfig =
-                FlinkUtils.getEffectiveConfig(flinkApp, 
defaultConfig.getFlinkConfig());
-
+        FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
         try {
-            observerFactory.getOrCreate(flinkApp).observe(flinkApp, context, 
effectiveConfig);
+            observerFactory.getOrCreate(flinkApp).observe(flinkApp, context);
+
+            Configuration effectiveConfig =
+                    FlinkUtils.getEffectiveConfig(flinkApp, 
defaultConfig.getFlinkConfig());
+            Optional<String> validationError = validator.validate(flinkApp);

Review comment:
       You changed the order of validation and getting the effective config. 
You first need to validate 

##########
File path: 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
##########
@@ -92,34 +92,33 @@ public FlinkDeploymentController(
     @Override
     public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
         LOG.info("Deleting FlinkDeployment");
-        Configuration effectiveConfig =
-                FlinkUtils.getEffectiveConfig(flinkApp, 
defaultConfig.getFlinkConfig());
         try {
-            observerFactory.getOrCreate(flinkApp).observe(flinkApp, context, 
effectiveConfig);
+            observerFactory.getOrCreate(flinkApp).observe(flinkApp, context);
         } catch (DeploymentFailedException dfe) {
             // ignore during cleanup
         }
+        Configuration effectiveConfig =
+                FlinkUtils.getEffectiveConfig(flinkApp, 
defaultConfig.getFlinkConfig());
         return reconcilerFactory.getOrCreate(flinkApp).cleanup(flinkApp, 
effectiveConfig);
     }
 
     @Override
     public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, 
Context context) {
-        FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
         LOG.info("Starting reconciliation");
-
-        Optional<String> validationError = validator.validate(flinkApp);
-        if (validationError.isPresent()) {
-            LOG.error("Validation failed: " + validationError.get());
-            ReconciliationUtils.updateForReconciliationError(flinkApp, 
validationError.get());
-            return ReconciliationUtils.toUpdateControl(
-                    operatorConfiguration, originalCopy, flinkApp, false);
-        }
-
-        Configuration effectiveConfig =
-                FlinkUtils.getEffectiveConfig(flinkApp, 
defaultConfig.getFlinkConfig());
-
+        FlinkDeployment originalCopy = ReconciliationUtils.clone(flinkApp);
         try {
-            observerFactory.getOrCreate(flinkApp).observe(flinkApp, context, 
effectiveConfig);
+            observerFactory.getOrCreate(flinkApp).observe(flinkApp, context);
+
+            Configuration effectiveConfig =
+                    FlinkUtils.getEffectiveConfig(flinkApp, 
defaultConfig.getFlinkConfig());
+            Optional<String> validationError = validator.validate(flinkApp);

Review comment:
       It would also be good to add a simple test to guard this by submitting a 
a config in the controllertest that would cause an exception in the 
getEffectiveConfig method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to