This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 0fa78d35 [FLINK-33803] Set observedGeneration at end of reconciliation
0fa78d35 is described below

commit 0fa78d350bd4a6cf2ea5e5bca0880092104355d8
Author: Justin Chen <[email protected]>
AuthorDate: Fri Jan 26 05:43:04 2024 -0500

    [FLINK-33803] Set observedGeneration at end of reconciliation
---
 docs/content/docs/custom-resource/reference.md          |  2 ++
 .../kubernetes/operator/api/status/CommonStatus.java    |  3 +++
 .../operator/api/status/ReconciliationStatus.java       |  1 +
 .../operator/reconciler/ReconciliationUtils.java        | 17 ++++-------------
 .../operator/utils/ReconciliationUtilsTest.java         | 13 +++++++++++++
 .../crds/flinkdeployments.flink.apache.org-v1.yml       |  2 ++
 .../crds/flinksessionjobs.flink.apache.org-v1.yml       |  2 ++
 7 files changed, 27 insertions(+), 13 deletions(-)

diff --git a/docs/content/docs/custom-resource/reference.md 
b/docs/content/docs/custom-resource/reference.md
index 649e5fbc..030e043a 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -243,6 +243,7 @@ This page serves as a full reference for FlinkDeployment 
custom resource definit
 | ----------| ---- | ---- |
 | jobStatus | org.apache.flink.kubernetes.operator.api.status.JobStatus | Last 
observed status of the Flink job on Application/Session cluster. |
 | error | java.lang.String | Error information about the 
FlinkDeployment/FlinkSessionJob. |
+| observedGeneration | java.lang.Long | Last observed generation of the 
FlinkDeployment/FlinkSessionJob. |
 | lifecycleState | 
org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState | 
Lifecycle state of the Flink resource (including being rolled back, failed 
etc.). |
 | clusterInfo | java.util.Map<java.lang.String,java.lang.String> | Information 
from running clusters. |
 | jobManagerDeploymentStatus | 
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus | 
Last observed status of the JobManager deployment. |
@@ -270,6 +271,7 @@ This page serves as a full reference for FlinkDeployment 
custom resource definit
 | ----------| ---- | ---- |
 | jobStatus | org.apache.flink.kubernetes.operator.api.status.JobStatus | Last 
observed status of the Flink job on Application/Session cluster. |
 | error | java.lang.String | Error information about the 
FlinkDeployment/FlinkSessionJob. |
+| observedGeneration | java.lang.Long | Last observed generation of the 
FlinkDeployment/FlinkSessionJob. |
 | lifecycleState | 
org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState | 
Lifecycle state of the Flink resource (including being rolled back, failed 
etc.). |
 | reconciliationStatus | 
org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobReconciliationStatus
 | Status of the last reconcile operation. |
 
diff --git 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
index 6c54c4c8..605c1e76 100644
--- 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
+++ 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
@@ -45,6 +45,9 @@ public abstract class CommonStatus<SPEC extends 
AbstractFlinkSpec> {
     /** Error information about the FlinkDeployment/FlinkSessionJob. */
     private String error;
 
+    /** Last observed generation of the FlinkDeployment/FlinkSessionJob. */
+    private Long observedGeneration;
+
     /** Lifecycle state of the Flink resource (including being rolled back, 
failed etc.). */
     @PrinterColumn(name = "Lifecycle State")
     // Calculated from the status, requires no setter. The purpose of this is 
to expose as a printer
diff --git 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/ReconciliationStatus.java
 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/ReconciliationStatus.java
index 88a9dc99..51203f1f 100644
--- 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/ReconciliationStatus.java
+++ 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/ReconciliationStatus.java
@@ -81,6 +81,7 @@ public abstract class ReconciliationStatus<SPEC extends 
AbstractFlinkSpec> {
     public void serializeAndSetLastReconciledSpec(
             SPEC spec, AbstractFlinkResource<SPEC, ?> resource) {
         setLastReconciledSpec(SpecUtils.writeSpecWithMeta(spec, resource));
+        
resource.getStatus().setObservedGeneration(resource.getMetadata().getGeneration());
     }
 
     public void markReconciledSpecAsStable() {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index a4c97bec..5929bfc1 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -420,10 +420,7 @@ public class ReconciliationUtils {
                 // running
                 deployment.getSpec().getJob().setState(JobState.RUNNING);
             }
-            deployment
-                    .getMetadata()
-                    .setGeneration(
-                            
lastReconciledSpecWithMeta.getMeta().getMetadata().getGeneration());
+            
deployment.getMetadata().setGeneration(status.getObservedGeneration());
             return true;
         }
     }
@@ -469,16 +466,9 @@ public class ReconciliationUtils {
      * @return The spec generation for the upgrade.
      */
     public static Long getUpgradeTargetGeneration(AbstractFlinkResource<?, ?> 
resource) {
-        var lastSpecWithMeta =
-                resource.getStatus()
-                        .getReconciliationStatus()
-                        .deserializeLastReconciledSpecWithMeta();
+        var observedGeneration = resource.getStatus().getObservedGeneration();
 
-        if (lastSpecWithMeta.getMeta() == null) {
-            return -1L;
-        }
-
-        return lastSpecWithMeta.getMeta().getMetadata().getGeneration();
+        return observedGeneration == null ? -1L : observedGeneration;
     }
 
     /**
@@ -569,5 +559,6 @@ public class ReconciliationUtils {
 
         reconciliationStatus.setLastReconciledSpec(
                 SpecUtils.writeSpecWithMeta(lastSpecWithMeta.getSpec(), 
newMeta));
+        
resource.getStatus().setObservedGeneration(resource.getMetadata().getGeneration());
     }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
index dc161da8..63751ceb 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
@@ -34,6 +34,7 @@ import java.time.Clock;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Test for {@link 
org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils}. */
@@ -98,4 +99,16 @@ public class ReconciliationUtilsTest {
                 ReconciliationUtils.toUpdateControl(operatorConfiguration, 
current, previous, true);
         assertEquals(0, updateControl.getScheduleDelay().get());
     }
+
+    @Test
+    public void testObservedGenerationStatus() throws Exception {
+        FlinkDeployment app = BaseTestUtils.buildApplicationCluster();
+        app.getSpec().getJob().setState(JobState.RUNNING);
+        
app.getStatus().getReconciliationStatus().setState(ReconciliationState.DEPLOYED);
+        app.getMetadata().setGeneration(1L);
+        assertNull(app.getStatus().getObservedGeneration());
+        ReconciliationUtils.updateStatusForDeployedSpec(app, new 
Configuration());
+        ReconciliationUtils.updateStatusBeforeDeploymentAttempt(app, new 
Configuration());
+        assertEquals(1L, app.getStatus().getObservedGeneration());
+    }
 }
diff --git 
a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml 
b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index 14de115c..66fd4865 100644
--- 
a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++ 
b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -9832,6 +9832,8 @@ spec:
                 type: object
               error:
                 type: string
+              observedGeneration:
+                type: integer
               lifecycleState:
                 enum:
                 - CREATED
diff --git 
a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml 
b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
index d9b758f4..d3ef6217 100644
--- 
a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
+++ 
b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
@@ -217,6 +217,8 @@ spec:
                 type: object
               error:
                 type: string
+              observedGeneration:
+                type: integer
               lifecycleState:
                 enum:
                 - CREATED

Reply via email to