This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new f82cf5a  [FLINK-26538] Add ability to restart flink deployment
f82cf5a is described below

commit f82cf5a229b44942b1c98d74c85da16ea3f58126
Author: Aitozi <[email protected]>
AuthorDate: Sun Mar 27 22:17:15 2022 +0800

    [FLINK-26538] Add ability to restart flink deployment
---
 docs/content/docs/custom-resource/reference.md     |  1 +
 .../operator/crd/spec/FlinkDeploymentSpec.java     |  6 +++
 .../operator/reconciler/JobReconciler.java         | 12 +++---
 .../kubernetes/operator/service/FlinkService.java  |  6 ++-
 .../operator/reconciler/JobReconcilerTest.java     | 48 ++++++++++++++++++++++
 .../crds/flinkdeployments.flink.apache.org-v1.yml  |  4 ++
 6 files changed, 69 insertions(+), 8 deletions(-)

diff --git a/docs/content/docs/custom-resource/reference.md 
b/docs/content/docs/custom-resource/reference.md
index 28b39e5..9880eec 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -55,6 +55,7 @@ This page serves as a full reference for FlinkDeployment 
custom resource definit
 | jobManager | org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec | 
JobManager specs. |
 | taskManager | org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec 
| TaskManager specs. |
 | job | org.apache.flink.kubernetes.operator.crd.spec.JobSpec | Job 
specification for application deployments. Null for session clusters. |
+| restartNonce | java.lang.Long | Nonce used to manually trigger restart for 
the cluster. In order to trigger restart, change  the number to anything other 
than the current value. |
 | logConfiguration | java.util.Map<java.lang.String,java.lang.String> | Log 
configuration overrides for the Flink deployment. Format logConfigFileName ->  
configContent. |
 
 ### FlinkSessionJobSpec
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
index a743f09..349cd0f 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
@@ -68,6 +68,12 @@ public class FlinkDeploymentSpec {
     private JobSpec job;
 
     /**
+     * Nonce used to manually trigger restart for the cluster. In order to 
trigger restart, change
+     * the number to anything other than the current value.
+     */
+    private Long restartNonce;
+
+    /**
      * Log configuration overrides for the Flink deployment. Format 
logConfigFileName ->
      * configContent.
      */
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
index 60c6851..55ed5ee 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
@@ -90,16 +90,14 @@ public class JobReconciler extends BaseReconciler {
             }
             JobState currentJobState = lastReconciledSpec.getJob().getState();
             JobState desiredJobState = jobSpec.getState();
-
             UpgradeMode upgradeMode = jobSpec.getUpgradeMode();
             JobState stateAfterReconcile = currentJobState;
             if (currentJobState == JobState.RUNNING) {
                 if (desiredJobState == JobState.RUNNING) {
-                    LOG.info("Upgrading running job, suspending first...");
+                    LOG.info("Upgrading/Restarting running job, suspending 
first...");
                 }
                 printCancelLogs(upgradeMode);
-                suspendJob(flinkApp, upgradeMode, effectiveConfig);
-                stateAfterReconcile = JobState.SUSPENDED;
+                stateAfterReconcile = suspendJob(flinkApp, upgradeMode, 
effectiveConfig);
             }
             if (currentJobState == JobState.SUSPENDED && desiredJobState == 
JobState.RUNNING) {
                 if (upgradeMode == UpgradeMode.STATELESS) {
@@ -202,17 +200,19 @@ public class JobReconciler extends BaseReconciler {
                 effectiveConfig);
     }
 
-    private void suspendJob(
+    private JobState suspendJob(
             FlinkDeployment flinkApp, UpgradeMode upgradeMode, Configuration 
effectiveConfig)
             throws Exception {
         final Optional<String> savepointOpt =
                 internalSuspendJob(flinkApp, upgradeMode, effectiveConfig);
 
         JobStatus jobStatus = flinkApp.getStatus().getJobStatus();
-        jobStatus.setState(JobState.SUSPENDED.name());
+        JobState stateAfterReconcile = JobState.SUSPENDED;
+        jobStatus.setState(stateAfterReconcile.name());
         
flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
         savepointOpt.ifPresent(
                 location -> 
jobStatus.getSavepointInfo().setLastSavepoint(Savepoint.of(location)));
+        return stateAfterReconcile;
     }
 
     @Override
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index b74106b..e17f185 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -58,6 +58,8 @@ import org.apache.commons.lang3.ObjectUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -161,8 +163,8 @@ public class FlinkService {
                 config, clusterId, (c, e) -> new 
StandaloneClientHAServices(restServerAddress));
     }
 
-    public Optional<String> cancelJob(JobID jobID, UpgradeMode upgradeMode, 
Configuration conf)
-            throws Exception {
+    public Optional<String> cancelJob(
+            @Nullable JobID jobID, UpgradeMode upgradeMode, Configuration 
conf) throws Exception {
         Optional<String> savepointOpt = Optional.empty();
         try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
             switch (upgradeMode) {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
index 8747056..dc33b29 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
@@ -312,6 +312,54 @@ public class JobReconcilerTest {
         assertNull(flinkService.listJobs().get(0).f0);
     }
 
+    @Test
+    public void triggerRestart() throws Exception {
+        Context context = 
TestUtils.createContextWithReadyJobManagerDeployment();
+        TestingFlinkService flinkService = new TestingFlinkService();
+
+        JobReconciler reconciler = new JobReconciler(null, flinkService, 
operatorConfiguration);
+        FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+        Configuration config = FlinkUtils.getEffectiveConfig(deployment, new 
Configuration());
+
+        reconciler.reconcile(deployment, context, config);
+        List<Tuple2<String, JobStatusMessage>> runningJobs = 
flinkService.listJobs();
+        verifyAndSetRunningJobsToStatus(deployment, runningJobs);
+
+        // Test restart job
+        FlinkDeployment restartJob = ReconciliationUtils.clone(deployment);
+        restartJob.getSpec().setRestartNonce(1L);
+        reconciler.reconcile(restartJob, context, config);
+        assertEquals(
+                JobState.SUSPENDED,
+                restartJob
+                        .getStatus()
+                        .getReconciliationStatus()
+                        .getLastReconciledSpec()
+                        .getJob()
+                        .getState());
+        runningJobs = flinkService.listJobs();
+        assertEquals(0, runningJobs.size());
+
+        reconciler.reconcile(restartJob, context, config);
+        assertEquals(
+                JobState.RUNNING,
+                restartJob
+                        .getStatus()
+                        .getReconciliationStatus()
+                        .getLastReconciledSpec()
+                        .getJob()
+                        .getState());
+        runningJobs = flinkService.listJobs();
+        assertEquals(1, runningJobs.size());
+        assertEquals(
+                1L,
+                restartJob
+                        .getStatus()
+                        .getReconciliationStatus()
+                        .getLastReconciledSpec()
+                        .getRestartNonce());
+    }
+
     private void verifyAndSetRunningJobsToStatus(
             FlinkDeployment deployment, List<Tuple2<String, JobStatusMessage>> 
runningJobs) {
         assertEquals(1, runningJobs.size());
diff --git a/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml 
b/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index 9c83d6d..ec6b264 100644
--- a/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++ b/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -9072,6 +9072,8 @@ spec:
                     - stateless
                     type: string
                 type: object
+              restartNonce:
+                type: integer
               logConfiguration:
                 additionalProperties:
                   type: string
@@ -18175,6 +18177,8 @@ spec:
                             - stateless
                             type: string
                         type: object
+                      restartNonce:
+                        type: integer
                       logConfiguration:
                         additionalProperties:
                           type: string

Reply via email to