This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 215912b0 [FLINK-30047] getLastSavepointStatus should return null when 
there is never savepoint completed or pending
215912b0 is described below

commit 215912b08ab0d915be4e84be17f5df97fd4bb6b5
Author: Clara Xiong <[email protected]>
AuthorDate: Wed Nov 16 18:24:39 2022 -0800

    [FLINK-30047] getLastSavepointStatus should return null when there is never 
savepoint completed or pending
---
 .../kubernetes/operator/utils/SavepointUtils.java  |  4 +--
 .../deployment/ApplicationReconcilerTest.java      | 32 ++++++++++++++--------
 2 files changed, 23 insertions(+), 13 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
index 29966692..ac7de49e 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
@@ -87,8 +87,8 @@ public class SavepointUtils {
                 return SavepointStatus.SUCCEEDED;
             }
         } else {
-            // Currently, we return SUCCEEDED if no savepoints were ever taken
-            return SavepointStatus.SUCCEEDED;
+            // Return null if no savepoint was ever taken
+            return null;
         }
 
         return SavepointStatus.ABANDONED;
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 39b03365..0eebdcd3 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -148,7 +148,7 @@ public class ApplicationReconcilerTest {
                         .deserializeLastReconciledSpecWithMeta()
                         .getMeta()
                         .isFirstDeployment());
-
+        assertEquals(null, 
SavepointUtils.getLastSavepointStatus(statelessUpgrade));
         runningJobs = flinkService.listJobs();
         assertEquals(1, flinkService.getRunningCount());
         assertNull(runningJobs.get(0).f0);
@@ -181,7 +181,8 @@ public class ApplicationReconcilerTest {
                         .getSavepointInfo()
                         .getLastSavepoint()
                         .getTriggerType());
-
+        assertEquals(
+                SavepointStatus.SUCCEEDED, 
SavepointUtils.getLastSavepointStatus(statefulUpgrade));
         verifyJobId(deployment, runningJobs.get(0).f1, runningJobs.get(0).f2, 
jobId);
 
         deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
@@ -249,15 +250,16 @@ public class ApplicationReconcilerTest {
         var runningJobs = flinkService.listJobs();
         verifyAndSetRunningJobsToStatus(deployment, runningJobs);
         
assertFalse(SavepointUtils.savepointInProgress(deployment.getStatus().getJobStatus()));
-        assertEquals(SavepointStatus.SUCCEEDED, 
SavepointUtils.getLastSavepointStatus(deployment));
+        
assertNull(deployment.getStatus().getJobStatus().getSavepointInfo().getLastSavepoint());
+        assertEquals(null, SavepointUtils.getLastSavepointStatus(deployment));
 
         FlinkDeployment spDeployment = ReconciliationUtils.clone(deployment);
 
         // don't trigger if nonce is missing
         reconciler.reconcile(spDeployment, context);
         
assertFalse(SavepointUtils.savepointInProgress(spDeployment.getStatus().getJobStatus()));
-        assertEquals(
-                SavepointStatus.SUCCEEDED, 
SavepointUtils.getLastSavepointStatus(spDeployment));
+        
assertNull(spDeployment.getStatus().getJobStatus().getSavepointInfo().getLastSavepoint());
+        assertEquals(null, 
SavepointUtils.getLastSavepointStatus(spDeployment));
 
         // trigger when nonce is defined
         spDeployment
@@ -294,8 +296,7 @@ public class ApplicationReconcilerTest {
         // don't trigger when nonce is the same
         reconciler.reconcile(spDeployment, context);
         
assertFalse(SavepointUtils.savepointInProgress(spDeployment.getStatus().getJobStatus()));
-        assertEquals(
-                SavepointStatus.SUCCEEDED, 
SavepointUtils.getLastSavepointStatus(spDeployment));
+        assertEquals(null, 
SavepointUtils.getLastSavepointStatus(spDeployment));
 
         // trigger when new nonce is defined
         spDeployment
@@ -319,22 +320,31 @@ public class ApplicationReconcilerTest {
         assertEquals(
                 SavepointTriggerType.MANUAL,
                 
spDeployment.getStatus().getJobStatus().getSavepointInfo().getTriggerType());
+        //  reconciled and savepoint is updated
         ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(
                 spDeployment.getStatus().getJobStatus().getSavepointInfo(), 
spDeployment);
         updateLastSavepoint(spDeployment);
         assertEquals(
                 SavepointStatus.SUCCEEDED, 
SavepointUtils.getLastSavepointStatus(spDeployment));
 
-        // don't trigger nonce is cleared
-        spDeployment.getSpec().getJob().setSavepointTriggerNonce(null);
+        // re-trigger, reconciled but savepoint is not updated
+        spDeployment
+                .getSpec()
+                .getJob()
+                
.setSavepointTriggerNonce(ThreadLocalRandom.current().nextLong());
         reconciler.reconcile(spDeployment, context);
-        
assertFalse(SavepointUtils.savepointInProgress(spDeployment.getStatus().getJobStatus()));
+        ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(
+                spDeployment.getStatus().getJobStatus().getSavepointInfo(), 
spDeployment);
+        
spDeployment.getStatus().getJobStatus().getSavepointInfo().resetTrigger();
         assertEquals(
-                SavepointStatus.SUCCEEDED, 
SavepointUtils.getLastSavepointStatus(spDeployment));
+                SavepointStatus.ABANDONED, 
SavepointUtils.getLastSavepointStatus(spDeployment));
 
+        // don't trigger when nonce is cleared
+        spDeployment.getSpec().getJob().setSavepointTriggerNonce(null);
         reconciler.reconcile(spDeployment, context);
         
assertFalse(SavepointUtils.savepointInProgress(spDeployment.getStatus().getJobStatus()));
 
+        // trigger by periodic settings
         spDeployment
                 .getSpec()
                 .getFlinkConfiguration()

Reply via email to