This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 2b82a934 [FLINK-35831] Rotate jobId for both savepoint and stateless
deploys
2b82a934 is described below
commit 2b82a9343140515646f823d85146e3e850bcb764
Author: Gyula Fora <[email protected]>
AuthorDate: Tue Jul 9 11:32:32 2024 +0200
[FLINK-35831] Rotate jobId for both savepoint and stateless deploys
---
.../deployment/ApplicationReconciler.java | 12 +++++------
.../deployment/ApplicationReconcilerTest.java | 25 +++++++++++++++++++---
2 files changed, 28 insertions(+), 9 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 5f5aedba..8621c50b 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -175,7 +175,8 @@ public class ApplicationReconciler
statusRecorder.patchAndCacheStatus(relatedResource,
ctx.getKubernetesClient());
}
- setJobIdIfNecessary(spec, relatedResource, deployConfig,
ctx.getKubernetesClient());
+ setJobIdIfNecessary(
+ relatedResource, deployConfig, ctx.getKubernetesClient(),
requireHaMetadata);
eventRecorder.triggerEvent(
relatedResource,
@@ -193,10 +194,10 @@ public class ApplicationReconciler
}
private void setJobIdIfNecessary(
- FlinkDeploymentSpec spec,
FlinkDeployment resource,
Configuration deployConfig,
- KubernetesClient client) {
+ KubernetesClient client,
+ boolean lastStateDeploy) {
// The jobId assigned by Flink would be constant,
// overwrite to avoid checkpoint path conflicts.
// https://issues.apache.org/jira/browse/FLINK-19358
@@ -208,9 +209,8 @@ public class ApplicationReconciler
}
var status = resource.getStatus();
- // generate jobId initially or rotate on every deployment when mode is
stateless
- if (status.getJobStatus().getJobId() == null
- || spec.getJob().getUpgradeMode() == UpgradeMode.STATELESS) {
+ // Rotate job id when not last-state deployment
+ if (status.getJobStatus().getJobId() == null || !lastStateDeploy) {
String jobId = JobID.generate().toHexString();
// record before first deployment to ensure we use it on any retry
status.getJobStatus().setJobId(jobId);
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index ec323bab..6f9e17fd 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -230,7 +230,15 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
.isFirstDeployment());
JobID jobId = runningJobs.get(0).f1.getJobId();
- verifyJobId(deployment, runningJobs.get(0).f1, runningJobs.get(0).f2,
jobId);
+
+ // Last state upgrade
+ FlinkDeployment lastStateUpgrade =
ReconciliationUtils.clone(deployment);
+ getJobSpec(lastStateUpgrade).setUpgradeMode(UpgradeMode.LAST_STATE);
+ lastStateUpgrade.getSpec().setRestartNonce(1234L);
+ reconciler.reconcile(deployment, context);
+ reconciler.reconcile(deployment, context);
+ // Make sure jobId is rotated on last-state startup
+ verifyJobId(lastStateUpgrade, runningJobs.get(0).f1,
runningJobs.get(0).f2, jobId);
// Test stateless upgrade
FlinkDeployment statelessUpgrade =
ReconciliationUtils.clone(deployment);
@@ -284,7 +292,10 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
SnapshotTriggerType.UPGRADE,
getSavepointInfo(statefulUpgrade).getLastSavepoint().getTriggerType());
assertEquals(SnapshotStatus.SUCCEEDED,
getLastSnapshotStatus(statefulUpgrade, SAVEPOINT));
- verifyJobId(deployment, runningJobs.get(0).f1, runningJobs.get(0).f2,
jobId);
+
+ // Make sure jobId rotated on savepoint
+ verifyNewJobId(runningJobs.get(0).f1, runningJobs.get(0).f2, jobId);
+ jobId = runningJobs.get(0).f1.getJobId();
getJobSpec(deployment).setUpgradeMode(UpgradeMode.LAST_STATE);
deployment.getSpec().setRestartNonce(100L);
@@ -325,7 +336,8 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
assertEquals(1, flinkService.getRunningCount());
assertEquals("finished_sp", runningJobs.get(0).f0);
- verifyJobId(deployment, runningJobs.get(0).f1, runningJobs.get(0).f2,
jobId);
+ // Make sure jobId rotated on savepoint
+ verifyNewJobId(runningJobs.get(0).f1, runningJobs.get(0).f2, jobId);
}
private void verifyJobId(
@@ -335,6 +347,13 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
assertEquals(conf.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID),
jobId.toHexString());
}
+ private void verifyNewJobId(JobStatusMessage status, Configuration conf,
JobID jobId) {
+ assertNotEquals(jobId.toHexString(), status.getJobId());
+ assertEquals(
+ conf.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID),
+ status.getJobId().toHexString());
+ }
+
@NotNull
private static Savepoint savepointFromSavepointInfo(
SavepointInfo savepointInfo, Long savepointTriggerNonce) {