This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 215912b0 [FLINK-30047] getLastSavepointStatus should return null when
there is never savepoint completed or pending
215912b0 is described below
commit 215912b08ab0d915be4e84be17f5df97fd4bb6b5
Author: Clara Xiong <[email protected]>
AuthorDate: Wed Nov 16 18:24:39 2022 -0800
[FLINK-30047] getLastSavepointStatus should return null when there is never
savepoint completed or pending
---
.../kubernetes/operator/utils/SavepointUtils.java | 4 +--
.../deployment/ApplicationReconcilerTest.java | 32 ++++++++++++++--------
2 files changed, 23 insertions(+), 13 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
index 29966692..ac7de49e 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
@@ -87,8 +87,8 @@ public class SavepointUtils {
return SavepointStatus.SUCCEEDED;
}
} else {
- // Currently, we return SUCCEEDED if no savepoints were ever taken
- return SavepointStatus.SUCCEEDED;
+ // Return null if no savepoint was ever taken
+ return null;
}
return SavepointStatus.ABANDONED;
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 39b03365..0eebdcd3 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -148,7 +148,7 @@ public class ApplicationReconcilerTest {
.deserializeLastReconciledSpecWithMeta()
.getMeta()
.isFirstDeployment());
-
+ assertEquals(null,
SavepointUtils.getLastSavepointStatus(statelessUpgrade));
runningJobs = flinkService.listJobs();
assertEquals(1, flinkService.getRunningCount());
assertNull(runningJobs.get(0).f0);
@@ -181,7 +181,8 @@ public class ApplicationReconcilerTest {
.getSavepointInfo()
.getLastSavepoint()
.getTriggerType());
-
+ assertEquals(
+ SavepointStatus.SUCCEEDED,
SavepointUtils.getLastSavepointStatus(statefulUpgrade));
verifyJobId(deployment, runningJobs.get(0).f1, runningJobs.get(0).f2,
jobId);
deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
@@ -249,15 +250,16 @@ public class ApplicationReconcilerTest {
var runningJobs = flinkService.listJobs();
verifyAndSetRunningJobsToStatus(deployment, runningJobs);
assertFalse(SavepointUtils.savepointInProgress(deployment.getStatus().getJobStatus()));
- assertEquals(SavepointStatus.SUCCEEDED,
SavepointUtils.getLastSavepointStatus(deployment));
+
assertNull(deployment.getStatus().getJobStatus().getSavepointInfo().getLastSavepoint());
+ assertEquals(null, SavepointUtils.getLastSavepointStatus(deployment));
FlinkDeployment spDeployment = ReconciliationUtils.clone(deployment);
// don't trigger if nonce is missing
reconciler.reconcile(spDeployment, context);
assertFalse(SavepointUtils.savepointInProgress(spDeployment.getStatus().getJobStatus()));
- assertEquals(
- SavepointStatus.SUCCEEDED,
SavepointUtils.getLastSavepointStatus(spDeployment));
+
assertNull(spDeployment.getStatus().getJobStatus().getSavepointInfo().getLastSavepoint());
+ assertEquals(null,
SavepointUtils.getLastSavepointStatus(spDeployment));
// trigger when nonce is defined
spDeployment
@@ -294,8 +296,7 @@ public class ApplicationReconcilerTest {
// don't trigger when nonce is the same
reconciler.reconcile(spDeployment, context);
assertFalse(SavepointUtils.savepointInProgress(spDeployment.getStatus().getJobStatus()));
- assertEquals(
- SavepointStatus.SUCCEEDED,
SavepointUtils.getLastSavepointStatus(spDeployment));
+ assertEquals(null,
SavepointUtils.getLastSavepointStatus(spDeployment));
// trigger when new nonce is defined
spDeployment
@@ -319,22 +320,31 @@ public class ApplicationReconcilerTest {
assertEquals(
SavepointTriggerType.MANUAL,
spDeployment.getStatus().getJobStatus().getSavepointInfo().getTriggerType());
+ // reconciled and savepoint is updated
ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(
spDeployment.getStatus().getJobStatus().getSavepointInfo(),
spDeployment);
updateLastSavepoint(spDeployment);
assertEquals(
SavepointStatus.SUCCEEDED,
SavepointUtils.getLastSavepointStatus(spDeployment));
- // don't trigger nonce is cleared
- spDeployment.getSpec().getJob().setSavepointTriggerNonce(null);
+ // re-trigger, reconciled but savepoint is not updated
+ spDeployment
+ .getSpec()
+ .getJob()
+
.setSavepointTriggerNonce(ThreadLocalRandom.current().nextLong());
reconciler.reconcile(spDeployment, context);
-
assertFalse(SavepointUtils.savepointInProgress(spDeployment.getStatus().getJobStatus()));
+ ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(
+ spDeployment.getStatus().getJobStatus().getSavepointInfo(),
spDeployment);
+
spDeployment.getStatus().getJobStatus().getSavepointInfo().resetTrigger();
assertEquals(
- SavepointStatus.SUCCEEDED,
SavepointUtils.getLastSavepointStatus(spDeployment));
+ SavepointStatus.ABANDONED,
SavepointUtils.getLastSavepointStatus(spDeployment));
+ // don't trigger when nonce is cleared
+ spDeployment.getSpec().getJob().setSavepointTriggerNonce(null);
reconciler.reconcile(spDeployment, context);
assertFalse(SavepointUtils.savepointInProgress(spDeployment.getStatus().getJobStatus()));
+ // trigger by periodic settings
spDeployment
.getSpec()
.getFlinkConfiguration()