This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 48a2f68  [FLINK-26181] Support manual savepoint triggering in the 
operator
48a2f68 is described below

commit 48a2f68fef73e5f7d26cf180a8124d295c39b32b
Author: Matyas Orhidi <[email protected]>
AuthorDate: Tue Mar 1 18:18:05 2022 +0100

    [FLINK-26181] Support manual savepoint triggering in the operator
---
 README.md                                          | 12 +++
 examples/basic-checkpoint-ha.yaml                  |  1 +
 .../flink/kubernetes/operator/FlinkOperator.java   |  2 +-
 .../config/FlinkOperatorConfiguration.java         | 29 ++++----
 .../operator/config/OperatorConfigOptions.java     |  8 ++
 .../controller/FlinkDeploymentController.java      | 22 +-----
 .../kubernetes/operator/crd/spec/JobSpec.java      |  1 +
 .../kubernetes/operator/crd/status/JobStatus.java  |  2 +-
 .../crd/status/{JobStatus.java => Savepoint.java}  | 19 +++--
 .../status/{JobStatus.java => SavepointInfo.java}  | 12 ++-
 .../kubernetes/operator/observer/Observer.java     | 48 +++++++++++-
 .../operator/observer/SavepointFetchResult.java    | 46 ++++++++++++
 .../operator/reconciler/JobReconciler.java         | 28 ++++++-
 .../operator/reconciler/ReconciliationUtils.java   | 76 +++++++++++++++++++
 .../operator/reconciler/SessionReconciler.java     |  2 +-
 .../kubernetes/operator/service/FlinkService.java  | 87 ++++++++++++++++++++++
 .../kubernetes/operator/utils/SavepointUtils.java  | 48 ++++++++++++
 .../validation/DefaultDeploymentValidator.java     |  3 +-
 .../flink/kubernetes/operator/TestUtils.java       | 17 -----
 .../kubernetes/operator/TestingFlinkService.java   | 16 ++++
 .../controller/FlinkDeploymentControllerTest.java  | 19 ++---
 .../kubernetes/operator/observer/ObserverTest.java | 75 ++++++++++++++++++-
 .../operator/reconciler/JobReconcilerTest.java     | 38 ++++++++--
 .../operator/utils/FlinkConfigBuilderTest.java     |  3 +-
 .../validation/DeploymentValidatorTest.java        | 15 ++--
 .../crds/flinkdeployments.flink.apache.org-v1.yml  | 20 ++++-
 26 files changed, 548 insertions(+), 101 deletions(-)

diff --git a/README.md b/README.md
index 2baad7e..ea8c57d 100644
--- a/README.md
+++ b/README.md
@@ -143,3 +143,15 @@ and apply it on your Kubernetes environment:
 kubectl create -f pod-monitor.yaml
 ```
 Once the custom resource is created in the Kubernetes environment the operator 
metrics are ready to explore 
[http://localhost:3000/explore](http://localhost:3000/explore).
+
+# Savepoints
+Savepoints can be triggered manually by defining a random (nonce) value to the 
variable `savepointTriggerNonce` in the job specification:
+```yaml
+ job:
+    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
+    parallelism: 2
+    upgradeMode: savepoint
+    state: running
+    savepointTriggerNonce: 123
+```
+The operator will trigger a savepoint every time the modified CR is applied 
and the nonce is different from the previous value.
\ No newline at end of file
diff --git a/examples/basic-checkpoint-ha.yaml 
b/examples/basic-checkpoint-ha.yaml
index 1796bfb..e288048 100644
--- a/examples/basic-checkpoint-ha.yaml
+++ b/examples/basic-checkpoint-ha.yaml
@@ -58,3 +58,4 @@ spec:
     parallelism: 2
     upgradeMode: savepoint
     state: running
+    savepointTriggerNonce: 0
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index a579372..7e46b0c 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -58,7 +58,7 @@ public class FlinkOperator {
         FlinkOperatorConfiguration operatorConfiguration =
                 
FlinkOperatorConfiguration.fromConfiguration(defaultConfig.getOperatorConfig());
 
-        Observer observer = new Observer(flinkService);
+        Observer observer = new Observer(flinkService, operatorConfiguration);
 
         FlinkDeploymentValidator validator = new DefaultDeploymentValidator();
         ReconcilerFactory factory =
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index 00e741d..5a2b5cb 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -20,25 +20,17 @@ package org.apache.flink.kubernetes.operator.config;
 
 import org.apache.flink.configuration.Configuration;
 
+import lombok.Value;
+
 /** Configuration class for operator. */
+@Value
 public class FlinkOperatorConfiguration {
 
-    private final int reconcileIntervalInSec;
+    int reconcileIntervalInSec;
 
-    private final int portCheckIntervalInSec;
+    int portCheckIntervalInSec;
 
-    public FlinkOperatorConfiguration(int reconcileIntervalInSec, int 
portCheckIntervalInSec) {
-        this.reconcileIntervalInSec = reconcileIntervalInSec;
-        this.portCheckIntervalInSec = portCheckIntervalInSec;
-    }
-
-    public int getReconcileIntervalInSec() {
-        return reconcileIntervalInSec;
-    }
-
-    public int getPortCheckIntervalInSec() {
-        return portCheckIntervalInSec;
-    }
+    int savepointTriggerGracePeriodInSec;
 
     public static FlinkOperatorConfiguration fromConfiguration(Configuration 
operatorConfig) {
         int reconcileIntervalInSec =
@@ -47,6 +39,13 @@ public class FlinkOperatorConfiguration {
         int portCheckIntervalInSec =
                 operatorConfig.getInteger(
                         
OperatorConfigOptions.OPERATOR_OBSERVER_PORT_CHECK_INTERVAL_IN_SEC);
-        return new FlinkOperatorConfiguration(reconcileIntervalInSec, 
portCheckIntervalInSec);
+
+        int savepointTriggerGracePeriodInSec =
+                operatorConfig.getInteger(
+                        OperatorConfigOptions
+                                
.OPERATOR_OBSERVER_SAVEPOINT_TRIGGER_GRACE_PERIOD_IN_SEC);
+
+        return new FlinkOperatorConfiguration(
+                reconcileIntervalInSec, portCheckIntervalInSec, 
savepointTriggerGracePeriodInSec);
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
index 0a80c07..6d15640 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
@@ -38,4 +38,12 @@ public class OperatorConfigOptions {
                     .withDescription(
                             "The interval in second for the controller to 
reschedule the reconcile process to "
                                     + "wait for deployment to be ready");
+
+    public static final ConfigOption<Integer>
+            OPERATOR_OBSERVER_SAVEPOINT_TRIGGER_GRACE_PERIOD_IN_SEC =
+                    
ConfigOptions.key("operator.observer.savepoint.trigger.grace-period.sec")
+                            .intType()
+                            .defaultValue(5)
+                            .withDescription(
+                                    "The interval in seconds before a 
savepoint trigger attempt is marked as unsuccessful");
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 8417c7f..26c0097 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -21,11 +21,11 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.config.DefaultConfig;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import 
org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
 import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
 import org.apache.flink.kubernetes.operator.observer.Observer;
 import org.apache.flink.kubernetes.operator.reconciler.ReconcilerFactory;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
 import 
org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator;
@@ -106,7 +106,7 @@ public class FlinkDeploymentController
         Optional<String> validationError = validator.validate(flinkApp);
         if (validationError.isPresent()) {
             LOG.error("Reconciliation failed: " + validationError.get());
-            updateForReconciliationError(flinkApp, validationError.get());
+            ReconciliationUtils.updateForReconciliationError(flinkApp, 
validationError.get());
             return UpdateControl.updateStatus(flinkApp);
         }
 
@@ -125,30 +125,16 @@ public class FlinkDeploymentController
                     reconcilerFactory
                             .getOrCreate(flinkApp)
                             .reconcile(operatorNamespace, flinkApp, context, 
effectiveConfig);
-            updateForReconciliationSuccess(flinkApp);
             return updateControl;
         } catch (InvalidDeploymentException ide) {
             LOG.error("Reconciliation failed", ide);
-            updateForReconciliationError(flinkApp, ide.getMessage());
+            ReconciliationUtils.updateForReconciliationError(flinkApp, 
ide.getMessage());
             return UpdateControl.updateStatus(flinkApp);
         } catch (Exception e) {
             throw new ReconciliationException(e);
         }
     }
 
-    private void updateForReconciliationSuccess(FlinkDeployment flinkApp) {
-        ReconciliationStatus reconciliationStatus = 
flinkApp.getStatus().getReconciliationStatus();
-        reconciliationStatus.setSuccess(true);
-        reconciliationStatus.setError(null);
-        reconciliationStatus.setLastReconciledSpec(flinkApp.getSpec());
-    }
-
-    private void updateForReconciliationError(FlinkDeployment flinkApp, String 
err) {
-        ReconciliationStatus reconciliationStatus = 
flinkApp.getStatus().getReconciliationStatus();
-        reconciliationStatus.setSuccess(false);
-        reconciliationStatus.setError(err);
-    }
-
     @Override
     public List<EventSource> 
prepareEventSources(EventSourceContext<FlinkDeployment> ctx) {
         Preconditions.checkNotNull(controllerConfig, "Controller config cannot 
be null");
@@ -170,7 +156,7 @@ public class FlinkDeploymentController
                 retryInfo.getAttemptCount(),
                 retryInfo.isLastAttempt());
 
-        updateForReconciliationError(
+        ReconciliationUtils.updateForReconciliationError(
                 flinkApp,
                 (e instanceof ReconciliationException) ? 
e.getCause().toString() : e.toString());
         return Optional.of(flinkApp);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java
index 062a5c0..9a687ac 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java
@@ -36,6 +36,7 @@ public class JobSpec {
     private JobState state = JobState.RUNNING;
 
     // The below fields are excluded from equals to avoid triggering job 
upgrades on changing these
+    @EqualsAndHashCode.Exclude private long savepointTriggerNonce;
     @EqualsAndHashCode.Exclude private String initialSavepointPath;
     @EqualsAndHashCode.Exclude private UpgradeMode upgradeMode = 
UpgradeMode.STATELESS;
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobStatus.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobStatus.java
index 5a8528f..bdb844a 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobStatus.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobStatus.java
@@ -32,5 +32,5 @@ public class JobStatus {
     private String jobId;
     private String state;
     private String updateTime;
-    private String savepointLocation;
+    private SavepointInfo savepointInfo = new SavepointInfo();
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobStatus.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/Savepoint.java
similarity index 78%
copy from 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobStatus.java
copy to 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/Savepoint.java
index 5a8528f..6201a37 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobStatus.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/Savepoint.java
@@ -18,19 +18,18 @@
 package org.apache.flink.kubernetes.operator.crd.status;
 
 import lombok.AllArgsConstructor;
-import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
-/** Status of an individual job within the Flink deployment. */
+/** Represents information about a finished savepoint. */
 @Data
-@NoArgsConstructor
 @AllArgsConstructor
-@Builder
-public class JobStatus {
-    private String jobName;
-    private String jobId;
-    private String state;
-    private String updateTime;
-    private String savepointLocation;
+@NoArgsConstructor
+public class Savepoint {
+    private long timeStamp;
+    private String location;
+
+    public static Savepoint of(String location) {
+        return new Savepoint(System.currentTimeMillis(), location);
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobStatus.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/SavepointInfo.java
similarity index 81%
copy from 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobStatus.java
copy to 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/SavepointInfo.java
index 5a8528f..23a892e 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobStatus.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/SavepointInfo.java
@@ -22,15 +22,13 @@ import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
-/** Status of an individual job within the Flink deployment. */
+/** Stores savepoint related information. */
 @Data
 @NoArgsConstructor
 @AllArgsConstructor
 @Builder
-public class JobStatus {
-    private String jobName;
-    private String jobId;
-    private String state;
-    private String updateTime;
-    private String savepointLocation;
+public class SavepointInfo {
+    private Savepoint lastSavepoint;
+    private String triggerId;
+    private long triggerTimestamp;
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
index 2772fbb..5bafee9 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
@@ -18,10 +18,14 @@
 package org.apache.flink.kubernetes.operator.observer;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 
 import io.fabric8.kubernetes.api.model.apps.Deployment;
@@ -43,9 +47,11 @@ public class Observer {
     private static final Logger LOG = LoggerFactory.getLogger(Observer.class);
 
     private final FlinkService flinkService;
+    private final FlinkOperatorConfiguration operatorConfiguration;
 
-    public Observer(FlinkService flinkService) {
+    public Observer(FlinkService flinkService, FlinkOperatorConfiguration 
operatorConfiguration) {
         this.flinkService = flinkService;
+        this.operatorConfiguration = operatorConfiguration;
     }
 
     public boolean observe(
@@ -129,13 +135,51 @@ public class Observer {
         }
     }
 
+    private boolean observeSavepointStatus(
+            FlinkDeployment flinkApp, Configuration effectiveConfig) {
+        SavepointInfo savepointInfo = 
flinkApp.getStatus().getJobStatus().getSavepointInfo();
+        if (savepointInfo.getTriggerId() == null) {
+            LOG.debug("Checkpointing not in progress");
+            return true;
+        }
+        SavepointFetchResult savepointFetchResult;
+        try {
+            savepointFetchResult = flinkService.fetchSavepointInfo(flinkApp, 
effectiveConfig);
+        } catch (Exception e) {
+            LOG.error("Exception while fetching savepoint info", e);
+            return false;
+        }
+
+        if (!savepointFetchResult.isTriggered()) {
+            String error = savepointFetchResult.getError();
+            if (error != null
+                    || SavepointUtils.gracePeriodEnded(operatorConfiguration, 
savepointInfo)) {
+                String errorMsg = error != null ? error : "Savepoint status 
unknown";
+                LOG.error(errorMsg);
+                savepointInfo.setTriggerId(null);
+                ReconciliationUtils.updateForReconciliationError(flinkApp, 
errorMsg);
+                return false;
+            }
+            LOG.info("Savepoint operation not running, waiting within grace 
period");
+        }
+        if (savepointFetchResult.getSavepoint() == null) {
+            LOG.info("Savepoint not completed yet");
+            return false;
+        }
+
+        savepointInfo.setLastSavepoint(savepointFetchResult.getSavepoint());
+        savepointInfo.setTriggerId(null);
+        return true;
+    }
+
     private boolean isReadyToReconcile(FlinkDeployment flinkApp, Configuration 
effectiveConfig) {
         JobManagerDeploymentStatus jmDeploymentStatus =
                 flinkApp.getStatus().getJobManagerDeploymentStatus();
 
         switch (jmDeploymentStatus) {
             case READY:
-                return observeFlinkJobStatus(flinkApp, effectiveConfig);
+                return observeFlinkJobStatus(flinkApp, effectiveConfig)
+                        && observeSavepointStatus(flinkApp, effectiveConfig);
             case MISSING:
                 return true;
             case DEPLOYING:
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointFetchResult.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointFetchResult.java
new file mode 100644
index 0000000..6d4ff34
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointFetchResult.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
+
+import lombok.Value;
+
+/** Result of a fetch savepoint operation. */
+@Value
+public class SavepointFetchResult {
+    private final Savepoint savepoint;
+    private final boolean isTriggered;
+    private final String error;
+
+    public static SavepointFetchResult notTriggered() {
+        return new SavepointFetchResult(null, false, null);
+    }
+
+    public static SavepointFetchResult error(String error) {
+        return new SavepointFetchResult(null, false, error);
+    }
+
+    public static SavepointFetchResult pending() {
+        return new SavepointFetchResult(null, true, null);
+    }
+
+    public static SavepointFetchResult completed(Savepoint savepoint) {
+        return new SavepointFetchResult(savepoint, true, null);
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
index 3d0cb39..6f6b597 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
@@ -26,10 +26,12 @@ import 
org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
 import 
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
+import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
@@ -73,6 +75,7 @@ public class JobReconciler extends BaseReconciler {
                     Optional.ofNullable(jobSpec.getInitialSavepointPath()));
             IngressUtils.updateIngressRules(
                     flinkApp, effectiveConfig, operatorNamespace, 
kubernetesClient, false);
+            ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp);
             return JobManagerDeploymentStatus.DEPLOYING.toUpdateControl(
                     flinkApp, operatorConfiguration);
         }
@@ -101,12 +104,20 @@ public class JobReconciler extends BaseReconciler {
                     restoreFromLastSavepoint(flinkApp, effectiveConfig);
                 } else if (upgradeMode == UpgradeMode.LAST_STATE) {
                     final String savepointLocation =
-                            
flinkApp.getStatus().getJobStatus().getSavepointLocation();
+                            flinkApp.getStatus()
+                                    .getJobStatus()
+                                    .getSavepointInfo()
+                                    .getLastSavepoint()
+                                    .getLocation();
                     // Upgrade mode changes from savepoint -> last-state
                     deployFlinkJob(
                             flinkApp, effectiveConfig, 
Optional.ofNullable(savepointLocation));
                 }
             }
+            ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp);
+        } else if (SavepointUtils.shouldTriggerSavepoint(flinkApp)) {
+            triggerSavepoint(flinkApp, effectiveConfig);
+            ReconciliationUtils.updateSavepointReconciliationSuccess(flinkApp);
         }
 
         return UpdateControl.updateStatus(flinkApp)
@@ -136,7 +147,10 @@ public class JobReconciler extends BaseReconciler {
     private void restoreFromLastSavepoint(FlinkDeployment flinkApp, 
Configuration effectiveConfig)
             throws Exception {
         JobStatus jobStatus = flinkApp.getStatus().getJobStatus();
-        deployFlinkJob(flinkApp, effectiveConfig, 
Optional.of(jobStatus.getSavepointLocation()));
+        deployFlinkJob(
+                flinkApp,
+                effectiveConfig,
+                
Optional.of(jobStatus.getSavepointInfo().getLastSavepoint().getLocation()));
     }
 
     private void printCancelLogs(UpgradeMode upgradeMode, String name) {
@@ -166,7 +180,10 @@ public class JobReconciler extends BaseReconciler {
         JobStatus jobStatus = flinkApp.getStatus().getJobStatus();
         jobStatus.setState("suspended");
         
flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
-        savepointOpt.ifPresent(jobStatus::setSavepointLocation);
+        savepointOpt.ifPresent(
+                location -> {
+                    
jobStatus.getSavepointInfo().setLastSavepoint(Savepoint.of(location));
+                });
         return savepointOpt;
     }
 
@@ -189,4 +206,9 @@ public class JobReconciler extends BaseReconciler {
 
         FlinkUtils.deleteCluster(flinkApp, kubernetesClient, true);
     }
+
+    private void triggerSavepoint(FlinkDeployment deployment, Configuration 
effectiveConfig)
+            throws Exception {
+        flinkService.triggerSavepoint(deployment, effectiveConfig);
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
new file mode 100644
index 0000000..d429f66
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/** Reconciliation utilities. */
+public class ReconciliationUtils {
+
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    public static void updateForSpecReconciliationSuccess(FlinkDeployment 
flinkApp) {
+        ReconciliationStatus reconciliationStatus = 
flinkApp.getStatus().getReconciliationStatus();
+        reconciliationStatus.setSuccess(true);
+        reconciliationStatus.setError(null);
+        FlinkDeploymentSpec clonedSpec = clone(flinkApp.getSpec());
+        if (reconciliationStatus.getLastReconciledSpec() != null) {
+            long oldSavepointTriggerNonce =
+                    reconciliationStatus
+                            .getLastReconciledSpec()
+                            .getJob()
+                            .getSavepointTriggerNonce();
+            
clonedSpec.getJob().setSavepointTriggerNonce(oldSavepointTriggerNonce);
+        }
+        reconciliationStatus.setLastReconciledSpec(clonedSpec);
+    }
+
+    public static void updateSavepointReconciliationSuccess(FlinkDeployment 
flinkApp) {
+        ReconciliationStatus reconciliationStatus = 
flinkApp.getStatus().getReconciliationStatus();
+        reconciliationStatus.setSuccess(true);
+        reconciliationStatus.setError(null);
+        reconciliationStatus
+                .getLastReconciledSpec()
+                .getJob()
+                
.setSavepointTriggerNonce(flinkApp.getSpec().getJob().getSavepointTriggerNonce());
+    }
+
+    public static void updateForReconciliationError(FlinkDeployment flinkApp, 
String err) {
+        ReconciliationStatus reconciliationStatus = 
flinkApp.getStatus().getReconciliationStatus();
+        reconciliationStatus.setSuccess(false);
+        reconciliationStatus.setError(err);
+    }
+
+    public static <T> T clone(T object) {
+        if (object == null) {
+            return null;
+        }
+        try {
+            return (T)
+                    objectMapper.readValue(
+                            objectMapper.writeValueAsString(object), 
object.getClass());
+        } catch (JsonProcessingException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
index 7596b4d..a4b64ce 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
@@ -64,7 +64,7 @@ public class SessionReconciler extends BaseReconciler {
         if (specChanged) {
             upgradeSessionCluster(flinkApp, effectiveConfig);
         }
-
+        ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp);
         return UpdateControl.updateStatus(flinkApp)
                 .rescheduleAfter(
                         operatorConfiguration.getReconcileIntervalInSec(), 
TimeUnit.SECONDS);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index 9822d90..140baea 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -35,9 +35,21 @@ import 
org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorato
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
+import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import 
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
+import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.TriggerId;
+import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
 
 import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
 import org.slf4j.Logger;
@@ -51,6 +63,7 @@ import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.util.Collection;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 /** Service for submitting and interacting with Flink clusters and jobs. */
@@ -168,4 +181,78 @@ public class FlinkService {
         FlinkUtils.deleteCluster(deployment, kubernetesClient, deleteHaData);
         FlinkUtils.waitForClusterShutdown(kubernetesClient, conf);
     }
+
+    public void triggerSavepoint(FlinkDeployment deployment, Configuration 
conf) throws Exception {
+        LOG.info("Triggering savepoint on " + 
deployment.getMetadata().getName());
+        try (RestClusterClient<String> clusterClient =
+                (RestClusterClient<String>) getClusterClient(conf)) {
+            SavepointTriggerHeaders savepointTriggerHeaders = 
SavepointTriggerHeaders.getInstance();
+            SavepointTriggerMessageParameters 
savepointTriggerMessageParameters =
+                    savepointTriggerHeaders.getUnresolvedMessageParameters();
+            savepointTriggerMessageParameters.jobID.resolve(
+                    
JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId()));
+
+            TriggerResponse response =
+                    clusterClient
+                            .sendRequest(
+                                    savepointTriggerHeaders,
+                                    savepointTriggerMessageParameters,
+                                    new SavepointTriggerRequestBody(null, 
false))
+                            .get();
+            LOG.info("Savepoint triggered: " + 
response.getTriggerId().toHexString());
+
+            org.apache.flink.kubernetes.operator.crd.status.SavepointInfo 
savepointInfo =
+                    deployment.getStatus().getJobStatus().getSavepointInfo();
+
+            savepointInfo.setTriggerId(response.getTriggerId().toHexString());
+            savepointInfo.setTriggerTimestamp(System.currentTimeMillis());
+        }
+    }
+
+    public SavepointFetchResult fetchSavepointInfo(FlinkDeployment deployment, 
Configuration conf)
+            throws Exception {
+        LOG.info(
+                "Fetching savepoint result with triggerId: "
+                        + 
deployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
+        try (RestClusterClient<String> clusterClient =
+                (RestClusterClient<String>) getClusterClient(conf)) {
+            SavepointStatusHeaders savepointStatusHeaders = 
SavepointStatusHeaders.getInstance();
+            SavepointStatusMessageParameters savepointStatusMessageParameters =
+                    savepointStatusHeaders.getUnresolvedMessageParameters();
+            savepointStatusMessageParameters.jobIdPathParameter.resolve(
+                    
JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId()));
+            savepointStatusMessageParameters.triggerIdPathParameter.resolve(
+                    TriggerId.fromHexString(
+                            deployment
+                                    .getStatus()
+                                    .getJobStatus()
+                                    .getSavepointInfo()
+                                    .getTriggerId()));
+            CompletableFuture<AsynchronousOperationResult<SavepointInfo>> 
response =
+                    clusterClient.sendRequest(
+                            savepointStatusHeaders,
+                            savepointStatusMessageParameters,
+                            EmptyRequestBody.getInstance());
+
+            if (response.get() == null || response.get().resource() == null) {
+                return SavepointFetchResult.notTriggered();
+            }
+
+            if (response.get().resource().getLocation() == null) {
+                if (response.get().resource().getFailureCause() != null) {
+                    LOG.error("Savepoint error", 
response.get().resource().getFailureCause());
+                    return SavepointFetchResult.error(
+                            
response.get().resource().getFailureCause().getMessage());
+                } else {
+                    return SavepointFetchResult.pending();
+                }
+            }
+
+            Savepoint savepoint =
+                    new Savepoint(
+                            System.currentTimeMillis(), 
response.get().resource().getLocation());
+            LOG.info("Savepoint result: " + savepoint);
+            return SavepointFetchResult.completed(savepoint);
+        }
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
new file mode 100644
index 0000000..9366472
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
+
+import java.util.concurrent.TimeUnit;
+
+/** Savepoint utilities. */
+public class SavepointUtils {
+    public static boolean shouldTriggerSavepoint(FlinkDeployment 
flinkDeployment) {
+        if 
(flinkDeployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId() 
!= null) {
+            return false;
+        }
+        return flinkDeployment.getSpec().getJob().getSavepointTriggerNonce()
+                != flinkDeployment
+                        .getStatus()
+                        .getReconciliationStatus()
+                        .getLastReconciledSpec()
+                        .getJob()
+                        .getSavepointTriggerNonce();
+    }
+
+    public static boolean gracePeriodEnded(
+            FlinkOperatorConfiguration configuration, SavepointInfo 
savepointInfo) {
+        int gracePeriod = configuration.getSavepointTriggerGracePeriodInSec();
+        long triggerTimestamp = savepointInfo.getTriggerTimestamp();
+        return (System.currentTimeMillis() - triggerTimestamp)
+                > TimeUnit.SECONDS.toMillis(gracePeriod);
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
index aaef01f..4db37c6 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
@@ -198,7 +198,8 @@ public class DefaultDeploymentValidator implements 
FlinkDeploymentValidator {
             if (oldJob.getState() == JobState.SUSPENDED
                     && newJob.getState() == JobState.RUNNING
                     && newJob.getUpgradeMode() == UpgradeMode.SAVEPOINT
-                    && 
deployment.getStatus().getJobStatus().getSavepointLocation() == null) {
+                    && 
(deployment.getStatus().getJobStatus().getSavepointInfo().getLastSavepoint()
+                            == null)) {
                 return Optional.of("Cannot perform savepoint restore without a 
valid savepoint");
             }
         }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 055d91b..009b17a 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -27,8 +27,6 @@ import org.apache.flink.kubernetes.operator.crd.spec.Resource;
 import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.fabric8.kubernetes.api.model.Container;
 import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
 import io.fabric8.kubernetes.api.model.Pod;
@@ -43,8 +41,6 @@ import java.util.Optional;
 /** Testing utilities. */
 public class TestUtils {
 
-    private static final ObjectMapper objectMapper = new ObjectMapper();
-
     public static final String TEST_NAMESPACE = "flink-operator-test";
     public static final String SERVICE_ACCOUNT = "flink-operator";
     public static final String FLINK_VERSION = "latest";
@@ -77,19 +73,6 @@ public class TestUtils {
         return deployment;
     }
 
-    public static <T> T clone(T object) {
-        if (object == null) {
-            return null;
-        }
-        try {
-            return (T)
-                    objectMapper.readValue(
-                            objectMapper.writeValueAsString(object), 
object.getClass());
-        } catch (JsonProcessingException e) {
-            throw new IllegalStateException(e);
-        }
-    }
-
     public static FlinkDeploymentSpec getTestFlinkDeploymentSpec() {
         return FlinkDeploymentSpec.builder()
                 .image(IMAGE)
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 2aab5ab..5da2ec5 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -23,6 +23,9 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
+import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
+import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
@@ -104,6 +107,19 @@ public class TestingFlinkService extends FlinkService {
     }
 
     @Override
+    public void triggerSavepoint(FlinkDeployment deployment, Configuration 
conf) throws Exception {
+        SavepointInfo savepointInfo = 
deployment.getStatus().getJobStatus().getSavepointInfo();
+        savepointInfo.setTriggerId("trigger_" + savepointCounter);
+        savepointInfo.setTriggerTimestamp(System.currentTimeMillis());
+    }
+
+    @Override
+    public SavepointFetchResult fetchSavepointInfo(FlinkDeployment deployment, 
Configuration conf)
+            throws Exception {
+        return SavepointFetchResult.completed(Savepoint.of("savepoint_" + 
savepointCounter++));
+    }
+
+    @Override
     public boolean isJobManagerPortReady(Configuration config) {
         return isPortReady;
     }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 60632ec..55d1628 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.observer.Observer;
 import org.apache.flink.kubernetes.operator.reconciler.JobReconcilerTest;
 import org.apache.flink.kubernetes.operator.reconciler.ReconcilerFactory;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import 
org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator;
 import org.apache.flink.runtime.client.JobStatusMessage;
@@ -131,7 +132,7 @@ public class FlinkDeploymentControllerTest {
         assertEquals(expectedJobStatus.getJobState().toString(), 
jobStatus.getState());
 
         // Send in invalid update
-        appCluster = TestUtils.clone(appCluster);
+        appCluster = ReconciliationUtils.clone(appCluster);
         appCluster.getSpec().setJob(null);
         updateControl = testController.reconcile(appCluster, context);
         assertTrue(updateControl.isUpdateStatus());
@@ -162,7 +163,7 @@ public class FlinkDeploymentControllerTest {
         assertEquals("s0", jobs.get(0).f0);
 
         List<Tuple2<String, JobStatusMessage>> previousJobs = new 
ArrayList<>(jobs);
-        appCluster = TestUtils.clone(appCluster);
+        appCluster = ReconciliationUtils.clone(appCluster);
         appCluster.getSpec().getJob().setInitialSavepointPath("s1");
 
         // Send in a no-op change
@@ -170,7 +171,7 @@ public class FlinkDeploymentControllerTest {
         assertEquals(previousJobs, new ArrayList<>(flinkService.listJobs()));
 
         // Upgrade job
-        appCluster = TestUtils.clone(appCluster);
+        appCluster = ReconciliationUtils.clone(appCluster);
         appCluster.getSpec().getJob().setParallelism(100);
 
         testController.reconcile(appCluster, context);
@@ -180,7 +181,7 @@ public class FlinkDeploymentControllerTest {
         testController.reconcile(appCluster, context);
 
         // Suspend job
-        appCluster = TestUtils.clone(appCluster);
+        appCluster = ReconciliationUtils.clone(appCluster);
         appCluster.getSpec().getJob().setState(JobState.SUSPENDED);
         testController.reconcile(appCluster, context);
         assertEquals(
@@ -188,7 +189,7 @@ public class FlinkDeploymentControllerTest {
                 appCluster.getStatus().getJobManagerDeploymentStatus());
 
         // Resume from last savepoint
-        appCluster = TestUtils.clone(appCluster);
+        appCluster = ReconciliationUtils.clone(appCluster);
         appCluster.getSpec().getJob().setState(JobState.RUNNING);
         testController.reconcile(appCluster, TestUtils.createEmptyContext());
         jobs = flinkService.listJobs();
@@ -213,7 +214,7 @@ public class FlinkDeploymentControllerTest {
         assertEquals("s0", jobs.get(0).f0);
 
         // Upgrade job
-        appCluster = TestUtils.clone(appCluster);
+        appCluster = ReconciliationUtils.clone(appCluster);
         appCluster.getSpec().getJob().setParallelism(100);
 
         UpdateControl<FlinkDeployment> updateControl =
@@ -229,12 +230,12 @@ public class FlinkDeploymentControllerTest {
         assertEquals(null, jobs.get(0).f0);
 
         // Suspend job
-        appCluster = TestUtils.clone(appCluster);
+        appCluster = ReconciliationUtils.clone(appCluster);
         appCluster.getSpec().getJob().setState(JobState.SUSPENDED);
         testController.reconcile(appCluster, context);
 
         // Resume from empty state
-        appCluster = TestUtils.clone(appCluster);
+        appCluster = ReconciliationUtils.clone(appCluster);
         appCluster.getSpec().getJob().setState(JobState.RUNNING);
         testController.reconcile(appCluster, context);
         jobs = flinkService.listJobs();
@@ -277,7 +278,7 @@ public class FlinkDeploymentControllerTest {
 
     private FlinkDeploymentController createTestController(
             KubernetesClient kubernetesClient, TestingFlinkService 
flinkService) {
-        Observer observer = new Observer(flinkService);
+        Observer observer = new Observer(flinkService, operatorConfiguration);
 
         FlinkDeploymentController controller =
                 new FlinkDeploymentController(
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
index 4e7615c..c5d73d4 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
@@ -20,10 +20,12 @@ package org.apache.flink.kubernetes.operator.observer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.reconciler.JobReconcilerTest;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 
@@ -32,6 +34,7 @@ import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** @link Observer unit tests */
@@ -39,11 +42,16 @@ public class ObserverTest {
 
     private final Context readyContext =
             JobReconcilerTest.createContextWithReadyJobManagerDeployment();
+    private final FlinkOperatorConfiguration operatorConfiguration =
+            FlinkOperatorConfiguration.fromConfiguration(new Configuration());
 
     @Test
     public void observeSessionCluster() {
         FlinkService flinkService = new TestingFlinkService();
-        Observer observer = new Observer(flinkService);
+        Observer observer =
+                new Observer(
+                        flinkService,
+                        FlinkOperatorConfiguration.fromConfiguration(new 
Configuration()));
         FlinkDeployment deployment = TestUtils.buildSessionCluster();
         deployment
                 .getStatus()
@@ -74,7 +82,10 @@ public class ObserverTest {
     @Test
     public void observeApplicationCluster() {
         TestingFlinkService flinkService = new TestingFlinkService();
-        Observer observer = new Observer(flinkService);
+        Observer observer =
+                new Observer(
+                        flinkService,
+                        FlinkOperatorConfiguration.fromConfiguration(new 
Configuration()));
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
         Configuration conf = FlinkUtils.getEffectiveConfig(deployment, new 
Configuration());
 
@@ -132,4 +143,64 @@ public class ObserverTest {
                 deployment.getStatus().getJobManagerDeploymentStatus());
         assertEquals("UNKNOWN", 
deployment.getStatus().getJobStatus().getState());
     }
+
+    @Test
+    public void observeSavepoint() throws Exception {
+        TestingFlinkService flinkService = new TestingFlinkService();
+        Observer observer =
+                new Observer(
+                        flinkService,
+                        FlinkOperatorConfiguration.fromConfiguration(new 
Configuration()));
+        FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+        Configuration conf = FlinkUtils.getEffectiveConfig(deployment, new 
Configuration());
+        flinkService.submitApplicationCluster(deployment, conf);
+        bringToReadyStatus(deployment);
+        assertTrue(observer.observe(deployment, readyContext, conf));
+        assertEquals(
+                JobManagerDeploymentStatus.READY,
+                deployment.getStatus().getJobManagerDeploymentStatus());
+        flinkService.triggerSavepoint(deployment, conf);
+
+        assertEquals(
+                "trigger_0",
+                
deployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
+        assertTrue(observer.observe(deployment, readyContext, conf));
+        assertEquals(
+                "savepoint_0",
+                deployment
+                        .getStatus()
+                        .getJobStatus()
+                        .getSavepointInfo()
+                        .getLastSavepoint()
+                        .getLocation());
+        
assertNull(deployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
+
+        flinkService.triggerSavepoint(deployment, conf);
+        assertEquals(
+                "trigger_1",
+                
deployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
+        assertTrue(observer.observe(deployment, readyContext, conf));
+        assertEquals(
+                "savepoint_1",
+                deployment
+                        .getStatus()
+                        .getJobStatus()
+                        .getSavepointInfo()
+                        .getLastSavepoint()
+                        .getLocation());
+        
assertNull(deployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
+    }
+
+    private void bringToReadyStatus(FlinkDeployment deployment) {
+        deployment
+                .getStatus()
+                .getReconciliationStatus()
+                
.setLastReconciledSpec(ReconciliationUtils.clone(deployment.getSpec()));
+        JobStatus jobStatus = new JobStatus();
+        jobStatus.setJobName("jobname");
+        jobStatus.setJobId("0000000000");
+        jobStatus.setState("RUNNING");
+        deployment.getStatus().setJobStatus(jobStatus);
+        
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
index fd940d4..cc7f508 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
@@ -26,7 +26,6 @@ import 
org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
-import org.apache.flink.kubernetes.operator.observer.Observer;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
@@ -88,7 +87,7 @@ public class JobReconcilerTest {
         verifyAndSetRunningJobsToStatus(deployment, runningJobs);
 
         // Test stateless upgrade
-        FlinkDeployment statelessUpgrade = TestUtils.clone(deployment);
+        FlinkDeployment statelessUpgrade = 
ReconciliationUtils.clone(deployment);
         
statelessUpgrade.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
         statelessUpgrade.getSpec().getFlinkConfiguration().put("new", "conf");
         reconciler.reconcile("test", statelessUpgrade, context, config);
@@ -103,7 +102,7 @@ public class JobReconcilerTest {
                 .setJobId(runningJobs.get(0).f1.getJobId().toHexString());
 
         // Test stateful upgrade
-        FlinkDeployment statefulUpgrade = TestUtils.clone(deployment);
+        FlinkDeployment statefulUpgrade = 
ReconciliationUtils.clone(deployment);
         
statefulUpgrade.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
         statefulUpgrade.getSpec().getFlinkConfiguration().put("new", "conf2");
 
@@ -119,7 +118,6 @@ public class JobReconcilerTest {
         final String expectedSavepointPath = "savepoint_0";
         final Context context = 
JobReconcilerTest.createContextWithReadyJobManagerDeployment();
         final TestingFlinkService flinkService = new TestingFlinkService();
-        Observer observer = new Observer(flinkService);
 
         final JobReconciler reconciler =
                 new JobReconciler(null, flinkService, operatorConfiguration);
@@ -143,7 +141,12 @@ public class JobReconcilerTest {
                         
.equalsIgnoreCase(deployment.getStatus().getJobStatus().getState()));
         assertEquals(
                 expectedSavepointPath,
-                deployment.getStatus().getJobStatus().getSavepointLocation());
+                deployment
+                        .getStatus()
+                        .getJobStatus()
+                        .getSavepointInfo()
+                        .getLastSavepoint()
+                        .getLocation());
 
         // Resume FlinkDeployment with last-state upgrade mode
         deployment
@@ -163,6 +166,29 @@ public class JobReconcilerTest {
         assertEquals(expectedSavepointPath, runningJobs.get(0).f0);
     }
 
+    @Test
+    public void triggerSavepoint() throws Exception {
+        Context context = 
JobReconcilerTest.createContextWithReadyJobManagerDeployment();
+        TestingFlinkService flinkService = new TestingFlinkService();
+        JobReconciler reconciler = new JobReconciler(null, flinkService, 
operatorConfiguration);
+        FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+        Configuration config = FlinkUtils.getEffectiveConfig(deployment, new 
Configuration());
+
+        reconciler.reconcile("test", deployment, context, config);
+        List<Tuple2<String, JobStatusMessage>> runningJobs = 
flinkService.listJobs();
+        verifyAndSetRunningJobsToStatus(deployment, runningJobs);
+        
assertNull(deployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
+
+        // trigger savepoint
+        FlinkDeployment spDeployment = ReconciliationUtils.clone(deployment);
+        long oldValue = 
spDeployment.getSpec().getJob().getSavepointTriggerNonce();
+        spDeployment.getSpec().getJob().setSavepointTriggerNonce(oldValue + 1);
+        reconciler.reconcile("test", spDeployment, context, config);
+        assertEquals(
+                "trigger_0",
+                
spDeployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
+    }
+
     private void verifyAndSetRunningJobsToStatus(
             FlinkDeployment deployment, List<Tuple2<String, JobStatusMessage>> 
runningJobs) {
         assertEquals(1, runningJobs.size());
@@ -170,7 +196,7 @@ public class JobReconcilerTest {
         deployment
                 .getStatus()
                 .getReconciliationStatus()
-                .setLastReconciledSpec(TestUtils.clone(deployment.getSpec()));
+                
.setLastReconciledSpec(ReconciliationUtils.clone(deployment.getSpec()));
 
         JobStatus jobStatus = new JobStatus();
         jobStatus.setJobName(runningJobs.get(0).f1.getJobName());
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
index fd93e7d..8d6aee5 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.utils.Constants;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -193,7 +194,7 @@ public class FlinkConfigBuilderTest {
 
     @Test
     public void testApplyTaskManagerSpec() throws Exception {
-        FlinkDeployment deploymentClone = TestUtils.clone(flinkDeployment);
+        FlinkDeployment deploymentClone = 
ReconciliationUtils.clone(flinkDeployment);
         deploymentClone.getSpec().setPodTemplate(null);
 
         final Configuration configuration =
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
index 184d3aa..bfeefb6 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
@@ -25,6 +25,8 @@ import 
org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.utils.Constants;
 
 import org.junit.Assert;
@@ -120,12 +122,15 @@ public class DeploymentValidatorTest {
                 dep -> {
                     dep.setStatus(new FlinkDeploymentStatus());
                     dep.getStatus().setJobStatus(new JobStatus());
-                    dep.getStatus().getJobStatus().setSavepointLocation("sp");
+                    dep.getStatus()
+                            .getJobStatus()
+                            .getSavepointInfo()
+                            .setLastSavepoint(Savepoint.of("sp"));
 
                     dep.getStatus().setReconciliationStatus(new 
ReconciliationStatus());
                     dep.getStatus()
                             .getReconciliationStatus()
-                            
.setLastReconciledSpec(TestUtils.clone(dep.getSpec()));
+                            
.setLastReconciledSpec(ReconciliationUtils.clone(dep.getSpec()));
                     dep.getStatus()
                             .getReconciliationStatus()
                             .getLastReconciledSpec()
@@ -143,7 +148,7 @@ public class DeploymentValidatorTest {
                     dep.getStatus().setReconciliationStatus(new 
ReconciliationStatus());
                     dep.getStatus()
                             .getReconciliationStatus()
-                            
.setLastReconciledSpec(TestUtils.clone(dep.getSpec()));
+                            
.setLastReconciledSpec(ReconciliationUtils.clone(dep.getSpec()));
                     dep.getStatus()
                             .getReconciliationStatus()
                             .getLastReconciledSpec()
@@ -163,7 +168,7 @@ public class DeploymentValidatorTest {
                     dep.getStatus().setReconciliationStatus(new 
ReconciliationStatus());
                     dep.getStatus()
                             .getReconciliationStatus()
-                            
.setLastReconciledSpec(TestUtils.clone(dep.getSpec()));
+                            
.setLastReconciledSpec(ReconciliationUtils.clone(dep.getSpec()));
                     dep.getSpec().setJob(null);
                 },
                 "Cannot switch from job to session cluster");
@@ -176,7 +181,7 @@ public class DeploymentValidatorTest {
                     dep.getStatus().setReconciliationStatus(new 
ReconciliationStatus());
                     dep.getStatus()
                             .getReconciliationStatus()
-                            
.setLastReconciledSpec(TestUtils.clone(dep.getSpec()));
+                            
.setLastReconciledSpec(ReconciliationUtils.clone(dep.getSpec()));
                     
dep.getStatus().getReconciliationStatus().getLastReconciledSpec().setJob(null);
                 },
                 "Cannot switch from session to job cluster");
diff --git a/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml 
b/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index ec9ee63..078c0ed 100644
--- a/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++ b/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -9048,6 +9048,8 @@ spec:
                     - running
                     - suspended
                     type: string
+                  savepointTriggerNonce:
+                    type: integer
                   initialSavepointPath:
                     type: string
                   upgradeMode:
@@ -9074,8 +9076,20 @@ spec:
                     type: string
                   updateTime:
                     type: string
-                  savepointLocation:
-                    type: string
+                  savepointInfo:
+                    properties:
+                      lastSavepoint:
+                        properties:
+                          timeStamp:
+                            type: integer
+                          location:
+                            type: string
+                        type: object
+                      triggerId:
+                        type: string
+                      triggerTimestamp:
+                        type: integer
+                    type: object
                 type: object
               jobManagerDeploymentStatus:
                 enum:
@@ -18121,6 +18135,8 @@ spec:
                             - running
                             - suspended
                             type: string
+                          savepointTriggerNonce:
+                            type: integer
                           initialSavepointPath:
                             type: string
                           upgradeMode:

Reply via email to