This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 2b82a934 [FLINK-35831] Rotate jobId for both savepoint and stateless 
deploys
2b82a934 is described below

commit 2b82a9343140515646f823d85146e3e850bcb764
Author: Gyula Fora <[email protected]>
AuthorDate: Tue Jul 9 11:32:32 2024 +0200

    [FLINK-35831] Rotate jobId for both savepoint and stateless deploys
---
 .../deployment/ApplicationReconciler.java          | 12 +++++------
 .../deployment/ApplicationReconcilerTest.java      | 25 +++++++++++++++++++---
 2 files changed, 28 insertions(+), 9 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 5f5aedba..8621c50b 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -175,7 +175,8 @@ public class ApplicationReconciler
             statusRecorder.patchAndCacheStatus(relatedResource, 
ctx.getKubernetesClient());
         }
 
-        setJobIdIfNecessary(spec, relatedResource, deployConfig, 
ctx.getKubernetesClient());
+        setJobIdIfNecessary(
+                relatedResource, deployConfig, ctx.getKubernetesClient(), 
requireHaMetadata);
 
         eventRecorder.triggerEvent(
                 relatedResource,
@@ -193,10 +194,10 @@ public class ApplicationReconciler
     }
 
     private void setJobIdIfNecessary(
-            FlinkDeploymentSpec spec,
             FlinkDeployment resource,
             Configuration deployConfig,
-            KubernetesClient client) {
+            KubernetesClient client,
+            boolean lastStateDeploy) {
         // The jobId assigned by Flink would be constant,
         // overwrite to avoid checkpoint path conflicts.
         // https://issues.apache.org/jira/browse/FLINK-19358
@@ -208,9 +209,8 @@ public class ApplicationReconciler
         }
 
         var status = resource.getStatus();
-        // generate jobId initially or rotate on every deployment when mode is 
stateless
-        if (status.getJobStatus().getJobId() == null
-                || spec.getJob().getUpgradeMode() == UpgradeMode.STATELESS) {
+        // Rotate job id when not last-state deployment
+        if (status.getJobStatus().getJobId() == null || !lastStateDeploy) {
             String jobId = JobID.generate().toHexString();
             // record before first deployment to ensure we use it on any retry
             status.getJobStatus().setJobId(jobId);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index ec323bab..6f9e17fd 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -230,7 +230,15 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
                         .isFirstDeployment());
 
         JobID jobId = runningJobs.get(0).f1.getJobId();
-        verifyJobId(deployment, runningJobs.get(0).f1, runningJobs.get(0).f2, 
jobId);
+
+        // Last state upgrade
+        FlinkDeployment lastStateUpgrade = 
ReconciliationUtils.clone(deployment);
+        getJobSpec(lastStateUpgrade).setUpgradeMode(UpgradeMode.LAST_STATE);
+        lastStateUpgrade.getSpec().setRestartNonce(1234L);
+        reconciler.reconcile(deployment, context);
+        reconciler.reconcile(deployment, context);
+        // Make sure jobId is rotated on last-state startup
+        verifyJobId(lastStateUpgrade, runningJobs.get(0).f1, 
runningJobs.get(0).f2, jobId);
 
         // Test stateless upgrade
         FlinkDeployment statelessUpgrade = 
ReconciliationUtils.clone(deployment);
@@ -284,7 +292,10 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
                 SnapshotTriggerType.UPGRADE,
                 
getSavepointInfo(statefulUpgrade).getLastSavepoint().getTriggerType());
         assertEquals(SnapshotStatus.SUCCEEDED, 
getLastSnapshotStatus(statefulUpgrade, SAVEPOINT));
-        verifyJobId(deployment, runningJobs.get(0).f1, runningJobs.get(0).f2, 
jobId);
+
+        // Make sure jobId rotated on savepoint
+        verifyNewJobId(runningJobs.get(0).f1, runningJobs.get(0).f2, jobId);
+        jobId = runningJobs.get(0).f1.getJobId();
 
         getJobSpec(deployment).setUpgradeMode(UpgradeMode.LAST_STATE);
         deployment.getSpec().setRestartNonce(100L);
@@ -325,7 +336,8 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
 
         assertEquals(1, flinkService.getRunningCount());
         assertEquals("finished_sp", runningJobs.get(0).f0);
-        verifyJobId(deployment, runningJobs.get(0).f1, runningJobs.get(0).f2, 
jobId);
+        // Make sure jobId rotated on savepoint
+        verifyNewJobId(runningJobs.get(0).f1, runningJobs.get(0).f2, jobId);
     }
 
     private void verifyJobId(
@@ -335,6 +347,13 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
         assertEquals(conf.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID), 
jobId.toHexString());
     }
 
+    private void verifyNewJobId(JobStatusMessage status, Configuration conf, 
JobID jobId) {
+        assertNotEquals(jobId.toHexString(), status.getJobId());
+        assertEquals(
+                conf.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID),
+                status.getJobId().toHexString());
+    }
+
     @NotNull
     private static Savepoint savepointFromSavepointInfo(
             SavepointInfo savepointInfo, Long savepointTriggerNonce) {

Reply via email to