This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 0fa78d35 [FLINK-33803] Set observedGeneration at end of reconciliation
0fa78d35 is described below
commit 0fa78d350bd4a6cf2ea5e5bca0880092104355d8
Author: Justin Chen <[email protected]>
AuthorDate: Fri Jan 26 05:43:04 2024 -0500
[FLINK-33803] Set observedGeneration at end of reconciliation
---
docs/content/docs/custom-resource/reference.md | 2 ++
.../kubernetes/operator/api/status/CommonStatus.java | 3 +++
.../operator/api/status/ReconciliationStatus.java | 1 +
.../operator/reconciler/ReconciliationUtils.java | 17 ++++-------------
.../operator/utils/ReconciliationUtilsTest.java | 13 +++++++++++++
.../crds/flinkdeployments.flink.apache.org-v1.yml | 2 ++
.../crds/flinksessionjobs.flink.apache.org-v1.yml | 2 ++
7 files changed, 27 insertions(+), 13 deletions(-)
diff --git a/docs/content/docs/custom-resource/reference.md
b/docs/content/docs/custom-resource/reference.md
index 649e5fbc..030e043a 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -243,6 +243,7 @@ This page serves as a full reference for FlinkDeployment
custom resource definit
| ----------| ---- | ---- |
| jobStatus | org.apache.flink.kubernetes.operator.api.status.JobStatus | Last
observed status of the Flink job on Application/Session cluster. |
| error | java.lang.String | Error information about the
FlinkDeployment/FlinkSessionJob. |
+| observedGeneration | java.lang.Long | Last observed generation of the
FlinkDeployment/FlinkSessionJob. |
| lifecycleState |
org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState |
Lifecycle state of the Flink resource (including being rolled back, failed
etc.). |
| clusterInfo | java.util.Map<java.lang.String,java.lang.String> | Information
from running clusters. |
| jobManagerDeploymentStatus |
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus |
Last observed status of the JobManager deployment. |
@@ -270,6 +271,7 @@ This page serves as a full reference for FlinkDeployment
custom resource definit
| ----------| ---- | ---- |
| jobStatus | org.apache.flink.kubernetes.operator.api.status.JobStatus | Last
observed status of the Flink job on Application/Session cluster. |
| error | java.lang.String | Error information about the
FlinkDeployment/FlinkSessionJob. |
+| observedGeneration | java.lang.Long | Last observed generation of the
FlinkDeployment/FlinkSessionJob. |
| lifecycleState |
org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState |
Lifecycle state of the Flink resource (including being rolled back, failed
etc.). |
| reconciliationStatus |
org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobReconciliationStatus
| Status of the last reconcile operation. |
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
index 6c54c4c8..605c1e76 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
@@ -45,6 +45,9 @@ public abstract class CommonStatus<SPEC extends
AbstractFlinkSpec> {
/** Error information about the FlinkDeployment/FlinkSessionJob. */
private String error;
+ /** Last observed generation of the FlinkDeployment/FlinkSessionJob. */
+ private Long observedGeneration;
+
/** Lifecycle state of the Flink resource (including being rolled back,
failed etc.). */
@PrinterColumn(name = "Lifecycle State")
// Calculated from the status, requires no setter. The purpose of this is
to expose as a printer
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/ReconciliationStatus.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/ReconciliationStatus.java
index 88a9dc99..51203f1f 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/ReconciliationStatus.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/ReconciliationStatus.java
@@ -81,6 +81,7 @@ public abstract class ReconciliationStatus<SPEC extends
AbstractFlinkSpec> {
public void serializeAndSetLastReconciledSpec(
SPEC spec, AbstractFlinkResource<SPEC, ?> resource) {
setLastReconciledSpec(SpecUtils.writeSpecWithMeta(spec, resource));
+
resource.getStatus().setObservedGeneration(resource.getMetadata().getGeneration());
}
public void markReconciledSpecAsStable() {
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index a4c97bec..5929bfc1 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -420,10 +420,7 @@ public class ReconciliationUtils {
// running
deployment.getSpec().getJob().setState(JobState.RUNNING);
}
- deployment
- .getMetadata()
- .setGeneration(
-
lastReconciledSpecWithMeta.getMeta().getMetadata().getGeneration());
+
deployment.getMetadata().setGeneration(status.getObservedGeneration());
return true;
}
}
@@ -469,16 +466,9 @@ public class ReconciliationUtils {
* @return The spec generation for the upgrade.
*/
public static Long getUpgradeTargetGeneration(AbstractFlinkResource<?, ?>
resource) {
- var lastSpecWithMeta =
- resource.getStatus()
- .getReconciliationStatus()
- .deserializeLastReconciledSpecWithMeta();
+ var observedGeneration = resource.getStatus().getObservedGeneration();
- if (lastSpecWithMeta.getMeta() == null) {
- return -1L;
- }
-
- return lastSpecWithMeta.getMeta().getMetadata().getGeneration();
+ return observedGeneration == null ? -1L : observedGeneration;
}
/**
@@ -569,5 +559,6 @@ public class ReconciliationUtils {
reconciliationStatus.setLastReconciledSpec(
SpecUtils.writeSpecWithMeta(lastSpecWithMeta.getSpec(),
newMeta));
+
resource.getStatus().setObservedGeneration(resource.getMetadata().getGeneration());
}
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
index dc161da8..63751ceb 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
@@ -34,6 +34,7 @@ import java.time.Clock;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/** Test for {@link
org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils}. */
@@ -98,4 +99,16 @@ public class ReconciliationUtilsTest {
ReconciliationUtils.toUpdateControl(operatorConfiguration,
current, previous, true);
assertEquals(0, updateControl.getScheduleDelay().get());
}
+
+ @Test
+ public void testObservedGenerationStatus() throws Exception {
+ FlinkDeployment app = BaseTestUtils.buildApplicationCluster();
+ app.getSpec().getJob().setState(JobState.RUNNING);
+
app.getStatus().getReconciliationStatus().setState(ReconciliationState.DEPLOYED);
+ app.getMetadata().setGeneration(1L);
+ assertNull(app.getStatus().getObservedGeneration());
+ ReconciliationUtils.updateStatusForDeployedSpec(app, new
Configuration());
+ ReconciliationUtils.updateStatusBeforeDeploymentAttempt(app, new
Configuration());
+ assertEquals(1L, app.getStatus().getObservedGeneration());
+ }
}
diff --git
a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index 14de115c..66fd4865 100644
---
a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++
b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -9832,6 +9832,8 @@ spec:
type: object
error:
type: string
+ observedGeneration:
+ type: integer
lifecycleState:
enum:
- CREATED
diff --git
a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
index d9b758f4..d3ef6217 100644
---
a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
+++
b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
@@ -217,6 +217,8 @@ spec:
type: object
error:
type: string
+ observedGeneration:
+ type: integer
lifecycleState:
enum:
- CREATED