This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new f82cf5a [FLINK-26538] Add ability to restart flink deployment
f82cf5a is described below
commit f82cf5a229b44942b1c98d74c85da16ea3f58126
Author: Aitozi <[email protected]>
AuthorDate: Sun Mar 27 22:17:15 2022 +0800
[FLINK-26538] Add ability to restart flink deployment
---
docs/content/docs/custom-resource/reference.md | 1 +
.../operator/crd/spec/FlinkDeploymentSpec.java | 6 +++
.../operator/reconciler/JobReconciler.java | 12 +++---
.../kubernetes/operator/service/FlinkService.java | 6 ++-
.../operator/reconciler/JobReconcilerTest.java | 48 ++++++++++++++++++++++
.../crds/flinkdeployments.flink.apache.org-v1.yml | 4 ++
6 files changed, 69 insertions(+), 8 deletions(-)
diff --git a/docs/content/docs/custom-resource/reference.md
b/docs/content/docs/custom-resource/reference.md
index 28b39e5..9880eec 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -55,6 +55,7 @@ This page serves as a full reference for FlinkDeployment
custom resource definit
| jobManager | org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec |
JobManager specs. |
| taskManager | org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec
| TaskManager specs. |
| job | org.apache.flink.kubernetes.operator.crd.spec.JobSpec | Job
specification for application deployments. Null for session clusters. |
+| restartNonce | java.lang.Long | Nonce used to manually trigger restart for
the cluster. In order to trigger restart, change the number to anything other
than the current value. |
| logConfiguration | java.util.Map<java.lang.String,java.lang.String> | Log
configuration overrides for the Flink deployment. Format logConfigFileName ->
configContent. |
### FlinkSessionJobSpec
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
index a743f09..349cd0f 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
@@ -68,6 +68,12 @@ public class FlinkDeploymentSpec {
private JobSpec job;
/**
+ * Nonce used to manually trigger restart for the cluster. In order to
trigger restart, change
+ * the number to anything other than the current value.
+ */
+ private Long restartNonce;
+
+ /**
* Log configuration overrides for the Flink deployment. Format
logConfigFileName ->
* configContent.
*/
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
index 60c6851..55ed5ee 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
@@ -90,16 +90,14 @@ public class JobReconciler extends BaseReconciler {
}
JobState currentJobState = lastReconciledSpec.getJob().getState();
JobState desiredJobState = jobSpec.getState();
-
UpgradeMode upgradeMode = jobSpec.getUpgradeMode();
JobState stateAfterReconcile = currentJobState;
if (currentJobState == JobState.RUNNING) {
if (desiredJobState == JobState.RUNNING) {
- LOG.info("Upgrading running job, suspending first...");
+ LOG.info("Upgrading/Restarting running job, suspending
first...");
}
printCancelLogs(upgradeMode);
- suspendJob(flinkApp, upgradeMode, effectiveConfig);
- stateAfterReconcile = JobState.SUSPENDED;
+ stateAfterReconcile = suspendJob(flinkApp, upgradeMode,
effectiveConfig);
}
if (currentJobState == JobState.SUSPENDED && desiredJobState ==
JobState.RUNNING) {
if (upgradeMode == UpgradeMode.STATELESS) {
@@ -202,17 +200,19 @@ public class JobReconciler extends BaseReconciler {
effectiveConfig);
}
- private void suspendJob(
+ private JobState suspendJob(
FlinkDeployment flinkApp, UpgradeMode upgradeMode, Configuration
effectiveConfig)
throws Exception {
final Optional<String> savepointOpt =
internalSuspendJob(flinkApp, upgradeMode, effectiveConfig);
JobStatus jobStatus = flinkApp.getStatus().getJobStatus();
- jobStatus.setState(JobState.SUSPENDED.name());
+ JobState stateAfterReconcile = JobState.SUSPENDED;
+ jobStatus.setState(stateAfterReconcile.name());
flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
savepointOpt.ifPresent(
location ->
jobStatus.getSavepointInfo().setLastSavepoint(Savepoint.of(location)));
+ return stateAfterReconcile;
}
@Override
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index b74106b..e17f185 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -58,6 +58,8 @@ import org.apache.commons.lang3.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
@@ -161,8 +163,8 @@ public class FlinkService {
config, clusterId, (c, e) -> new
StandaloneClientHAServices(restServerAddress));
}
- public Optional<String> cancelJob(JobID jobID, UpgradeMode upgradeMode,
Configuration conf)
- throws Exception {
+ public Optional<String> cancelJob(
+ @Nullable JobID jobID, UpgradeMode upgradeMode, Configuration
conf) throws Exception {
Optional<String> savepointOpt = Optional.empty();
try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
switch (upgradeMode) {
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
index 8747056..dc33b29 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
@@ -312,6 +312,54 @@ public class JobReconcilerTest {
assertNull(flinkService.listJobs().get(0).f0);
}
+ @Test
+ public void triggerRestart() throws Exception {
+ Context context =
TestUtils.createContextWithReadyJobManagerDeployment();
+ TestingFlinkService flinkService = new TestingFlinkService();
+
+ JobReconciler reconciler = new JobReconciler(null, flinkService,
operatorConfiguration);
+ FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+ Configuration config = FlinkUtils.getEffectiveConfig(deployment, new
Configuration());
+
+ reconciler.reconcile(deployment, context, config);
+ List<Tuple2<String, JobStatusMessage>> runningJobs =
flinkService.listJobs();
+ verifyAndSetRunningJobsToStatus(deployment, runningJobs);
+
+ // Test restart job
+ FlinkDeployment restartJob = ReconciliationUtils.clone(deployment);
+ restartJob.getSpec().setRestartNonce(1L);
+ reconciler.reconcile(restartJob, context, config);
+ assertEquals(
+ JobState.SUSPENDED,
+ restartJob
+ .getStatus()
+ .getReconciliationStatus()
+ .getLastReconciledSpec()
+ .getJob()
+ .getState());
+ runningJobs = flinkService.listJobs();
+ assertEquals(0, runningJobs.size());
+
+ reconciler.reconcile(restartJob, context, config);
+ assertEquals(
+ JobState.RUNNING,
+ restartJob
+ .getStatus()
+ .getReconciliationStatus()
+ .getLastReconciledSpec()
+ .getJob()
+ .getState());
+ runningJobs = flinkService.listJobs();
+ assertEquals(1, runningJobs.size());
+ assertEquals(
+ 1L,
+ restartJob
+ .getStatus()
+ .getReconciliationStatus()
+ .getLastReconciledSpec()
+ .getRestartNonce());
+ }
+
private void verifyAndSetRunningJobsToStatus(
FlinkDeployment deployment, List<Tuple2<String, JobStatusMessage>>
runningJobs) {
assertEquals(1, runningJobs.size());
diff --git a/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
b/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index 9c83d6d..ec6b264 100644
--- a/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++ b/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -9072,6 +9072,8 @@ spec:
- stateless
type: string
type: object
+ restartNonce:
+ type: integer
logConfiguration:
additionalProperties:
type: string
@@ -18175,6 +18177,8 @@ spec:
- stateless
type: string
type: object
+ restartNonce:
+ type: integer
logConfiguration:
additionalProperties:
type: string