This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 4f2f0d3  [FLINK-27257] Retry failed savepoints within grace period
4f2f0d3 is described below

commit 4f2f0d392959f70bbb7b012403f1d6c9c16477cc
Author: Gyula Fora <g_f...@apple.com>
AuthorDate: Tue Jun 7 11:12:48 2022 +0200

    [FLINK-27257] Retry failed savepoints within grace period
---
 .../kubernetes_operator_config_configuration.html  |  12 +-
 .../config/FlinkOperatorConfiguration.java         |   7 --
 .../config/KubernetesOperatorConfigOptions.java    |   8 +-
 .../operator/observer/SavepointObserver.java       | 138 +++++++++++----------
 .../operator/reconciler/ReconciliationUtils.java   |  17 ++-
 .../kubernetes/operator/utils/SavepointUtils.java  |  15 ++-
 .../operator/observer/SavepointObserverTest.java   |   6 +-
 .../deployment/ApplicationObserverTest.java        |  29 ++++-
 .../deployment/ApplicationReconcilerTest.java      |  24 +++-
 .../sessionjob/FlinkSessionJobReconcilerTest.java  |  17 +--
 10 files changed, 165 insertions(+), 108 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index 5e9b466..08c72d0 100644
--- 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -68,12 +68,6 @@
             <td>Duration</td>
             <td>Final delay before deployment is marked ready after port 
becomes accessible.</td>
         </tr>
-        <tr>
-            
<td><h5>kubernetes.operator.observer.savepoint.trigger.grace-period</h5></td>
-            <td style="word-wrap: break-word;">10 s</td>
-            <td>Duration</td>
-            <td>The interval before a savepoint trigger attempt is marked as 
unsuccessful.</td>
-        </tr>
         <tr>
             <td><h5>kubernetes.operator.periodic.savepoint.interval</h5></td>
             <td style="word-wrap: break-word;">0 ms</td>
@@ -122,6 +116,12 @@
             <td>Integer</td>
             <td>Maximum number of savepoint history entries to retain.</td>
         </tr>
+        <tr>
+            
<td><h5>kubernetes.operator.savepoint.trigger.grace-period</h5></td>
+            <td style="word-wrap: break-word;">1 min</td>
+            <td>Duration</td>
+            <td>The interval before a savepoint trigger attempt is marked as 
unsuccessful.</td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.user.artifacts.base.dir</h5></td>
             <td style="word-wrap: break-word;">"/opt/flink/artifacts"</td>
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index 69a3ef8..36b9383 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -34,7 +34,6 @@ public class FlinkOperatorConfiguration {
     int reconcilerMaxParallelism;
     Duration progressCheckInterval;
     Duration restApiReadyDelay;
-    Duration savepointTriggerGracePeriod;
     Duration flinkClientTimeout;
     String flinkServiceHostOverride;
     Set<String> watchedNamespaces;
@@ -62,11 +61,6 @@ public class FlinkOperatorConfiguration {
                 operatorConfig.get(
                         
KubernetesOperatorConfigOptions.OPERATOR_OBSERVER_PROGRESS_CHECK_INTERVAL);
 
-        Duration savepointTriggerGracePeriod =
-                operatorConfig.get(
-                        KubernetesOperatorConfigOptions
-                                
.OPERATOR_OBSERVER_SAVEPOINT_TRIGGER_GRACE_PERIOD);
-
         Duration flinkClientTimeout =
                 operatorConfig.get(
                         
KubernetesOperatorConfigOptions.OPERATOR_OBSERVER_FLINK_CLIENT_TIMEOUT);
@@ -103,7 +97,6 @@ public class FlinkOperatorConfiguration {
                 reconcilerMaxParallelism,
                 progressCheckInterval,
                 restApiReadyDelay,
-                savepointTriggerGracePeriod,
                 flinkClientTimeout,
                 flinkServiceHostOverride,
                 watchedNamespaces,
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index f5eaade..b8e03b2 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -57,10 +57,12 @@ public class KubernetesOperatorConfigOptions {
                     .withDescription(
                             "The interval for observing status for in-progress 
operations such as deployment and savepoints.");
 
-    public static final ConfigOption<Duration> 
OPERATOR_OBSERVER_SAVEPOINT_TRIGGER_GRACE_PERIOD =
-            
ConfigOptions.key("kubernetes.operator.observer.savepoint.trigger.grace-period")
+    public static final ConfigOption<Duration> 
OPERATOR_SAVEPOINT_TRIGGER_GRACE_PERIOD =
+            
ConfigOptions.key("kubernetes.operator.savepoint.trigger.grace-period")
                     .durationType()
-                    .defaultValue(Duration.ofSeconds(10))
+                    .defaultValue(Duration.ofMinutes(1))
+                    .withDeprecatedKeys(
+                            
"kubernetes.operator.observer.savepoint.trigger.grace-period")
                     .withDescription(
                             "The interval before a savepoint trigger attempt 
is marked as unsuccessful.");
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
index a0b5788..dc129c7 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
@@ -19,7 +19,6 @@ package org.apache.flink.kubernetes.operator.observer;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
@@ -27,12 +26,12 @@ import 
org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
 import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
 import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.EventUtils;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
 import org.apache.flink.kubernetes.operator.utils.StatusHelper;
 
-import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,6 +60,7 @@ public class SavepointObserver<STATUS extends 
CommonStatus<?>> {
 
     public void observeSavepointStatus(
             AbstractFlinkResource<?, STATUS> resource, Configuration 
deployedConfig) {
+
         var jobStatus = resource.getStatus().getJobStatus();
         var savepointInfo = jobStatus.getSavepointInfo();
         var jobId = jobStatus.getJobId();
@@ -69,96 +69,79 @@ public class SavepointObserver<STATUS extends 
CommonStatus<?>> {
                         .map(Savepoint::getLocation)
                         .orElse(null);
 
-        observeTriggeredSavepointProgress(savepointInfo, jobId, deployedConfig)
-                .ifPresent(
-                        err ->
-                                EventUtils.createOrUpdateEvent(
-                                        flinkService.getKubernetesClient(),
-                                        resource,
-                                        EventUtils.Type.Warning,
-                                        "SavepointError",
-                                        SavepointUtils.createSavepointError(
-                                                savepointInfo,
-                                                resource.getSpec()
-                                                        .getJob()
-                                                        
.getSavepointTriggerNonce()),
-                                        EventUtils.Component.Operator));
-
-        // We only need to observe latest checkpoint/savepoint for terminal 
jobs
-        if (JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState()) 
{
-            observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
+        // If any manual or periodic savepoint is in progress, observe it
+        if (SavepointUtils.savepointInProgress(jobStatus)) {
+            observeTriggeredSavepoint(resource, jobId, deployedConfig);
         }
 
-        var currentLastSpPath =
-                Optional.ofNullable(savepointInfo.getLastSavepoint())
-                        .map(Savepoint::getLocation)
-                        .orElse(null);
-
-        // If the last savepoint information changes we need to patch the 
status
-        // to avoid losing this in case of an operator failure after the 
cluster was shut down
-        if (currentLastSpPath != null && 
!currentLastSpPath.equals(previousLastSpPath)) {
-            LOG.info(
-                    "Updating resource status after observing new last 
savepoint {}",
-                    currentLastSpPath);
-            statusHelper.patchAndCacheStatus(resource);
+        // If job is in globally terminal state, observe last savepoint
+        if (ReconciliationUtils.isJobInTerminalState(resource.getStatus())) {
+            observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
         }
+
+        patchStatusOnSavepointChange(resource, savepointInfo, 
previousLastSpPath);
     }
 
     /**
-     * Observe the savepoint result based on the current savepoint info.
+     * Observe the status of manually triggered savepoints.
      *
-     * @param currentSavepointInfo the current savepoint info.
+     * @param resource the resource being observed
      * @param jobID the jobID of the observed job.
      * @param deployedConfig Deployed job config.
-     * @return The observed error, if no error observed, {@code 
Optional.empty()} will be returned.
      */
-    private Optional<String> observeTriggeredSavepointProgress(
-            SavepointInfo currentSavepointInfo, String jobID, Configuration 
deployedConfig) {
-        if (StringUtils.isEmpty(currentSavepointInfo.getTriggerId())) {
-            LOG.debug("Savepoint not in progress");
-            return Optional.empty();
-        }
+    private void observeTriggeredSavepoint(
+            AbstractFlinkResource<?, ?> resource, String jobID, Configuration 
deployedConfig) {
+
+        var savepointInfo = 
resource.getStatus().getJobStatus().getSavepointInfo();
+
         LOG.info("Observing savepoint status.");
-        SavepointFetchResult savepointFetchResult =
+        var savepointFetchResult =
                 flinkService.fetchSavepointInfo(
-                        currentSavepointInfo.getTriggerId(), jobID, 
deployedConfig);
+                        savepointInfo.getTriggerId(), jobID, deployedConfig);
 
         if (savepointFetchResult.isPending()) {
-            if (SavepointUtils.gracePeriodEnded(
-                    configManager.getOperatorConfiguration(), 
currentSavepointInfo)) {
-                String errorMsg =
-                        "Savepoint operation timed out after "
-                                + configManager
-                                        .getOperatorConfiguration()
-                                        .getSavepointTriggerGracePeriod();
-                currentSavepointInfo.resetTrigger();
-                LOG.error(errorMsg);
-                return Optional.of(errorMsg);
-            } else {
-                LOG.info("Savepoint operation not finished yet, waiting within 
grace period...");
-                return Optional.empty();
-            }
+            LOG.info("Savepoint operation not finished yet...");
+            return;
         }
 
         if (savepointFetchResult.getError() != null) {
-            currentSavepointInfo.resetTrigger();
-            return Optional.of(savepointFetchResult.getError());
+            var err = savepointFetchResult.getError();
+            if (SavepointUtils.gracePeriodEnded(deployedConfig, 
savepointInfo)) {
+                LOG.error(
+                        "Savepoint attempt failed after grace period. Won't be 
retried again: "
+                                + err);
+                ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(
+                        savepointInfo, resource);
+                EventUtils.createOrUpdateEvent(
+                        flinkService.getKubernetesClient(),
+                        resource,
+                        EventUtils.Type.Warning,
+                        "SavepointError",
+                        SavepointUtils.createSavepointError(
+                                savepointInfo,
+                                
resource.getSpec().getJob().getSavepointTriggerNonce()),
+                        EventUtils.Component.Operator);
+            } else {
+                LOG.warn("Savepoint failed within grace period, retrying: " + 
err);
+            }
+            savepointInfo.resetTrigger();
+            return;
         }
 
-        LOG.info("Savepoint status updated with latest completed savepoint 
info");
         var savepoint =
                 new Savepoint(
-                        currentSavepointInfo.getTriggerTimestamp(),
+                        savepointInfo.getTriggerTimestamp(),
                         savepointFetchResult.getLocation(),
-                        currentSavepointInfo.getTriggerType());
-        currentSavepointInfo.updateLastSavepoint(savepoint);
+                        savepointInfo.getTriggerType());
 
-        updateSavepointHistory(currentSavepointInfo, savepoint, 
deployedConfig);
-        return Optional.empty();
+        
ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(savepointInfo, 
resource);
+        savepointInfo.updateLastSavepoint(savepoint);
+        cleanupSavepointHistory(savepointInfo, savepoint, deployedConfig);
     }
 
+    /** Clean up and dispose savepoints according to the configured max 
size/age. */
     @VisibleForTesting
-    void updateSavepointHistory(
+    void cleanupSavepointHistory(
             SavepointInfo currentSavepointInfo,
             Savepoint newSavepoint,
             Configuration deployedConfig) {
@@ -204,4 +187,27 @@ public class SavepointObserver<STATUS extends 
CommonStatus<?>> {
             throw new ReconciliationException(e);
         }
     }
+
+    /**
+     * Patch the Kubernetes Flink resource status if we observed a new last 
savepoint. This is
+     * crucial to not lose this information once the reconciler shuts down the 
cluster.
+     */
+    private void patchStatusOnSavepointChange(
+            AbstractFlinkResource<?, STATUS> resource,
+            SavepointInfo savepointInfo,
+            String previousLastSpPath) {
+        var currentLastSpPath =
+                Optional.ofNullable(savepointInfo.getLastSavepoint())
+                        .map(Savepoint::getLocation)
+                        .orElse(null);
+
+        // If the last savepoint information changes we need to patch the 
status
+        // to avoid losing this in case of an operator failure after the 
cluster was shut down
+        if (currentLastSpPath != null && 
!currentLastSpPath.equals(previousLastSpPath)) {
+            LOG.info(
+                    "Updating resource status after observing new last 
savepoint {}",
+                    currentLastSpPath);
+            statusHelper.patchAndCacheStatus(resource);
+        }
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index 4a21170..cd28a57 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -33,6 +33,8 @@ import 
org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
+import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
 import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
 import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
@@ -95,13 +97,18 @@ public class ReconciliationUtils {
         }
     }
 
-    public static <SPEC extends AbstractFlinkSpec> void 
updateSavepointReconciliationSuccess(
-            AbstractFlinkResource<SPEC, ?> target) {
+    public static <SPEC extends AbstractFlinkSpec> void 
updateLastReconciledSavepointTriggerNonce(
+            SavepointInfo savepointInfo, AbstractFlinkResource<SPEC, ?> 
target) {
+
+        // We only need to update for MANUAL triggers
+        if (savepointInfo.getTriggerType() != SavepointTriggerType.MANUAL) {
+            return;
+        }
+
         var commonStatus = target.getStatus();
         var spec = target.getSpec();
-        ReconciliationStatus<SPEC> reconciliationStatus = 
commonStatus.getReconciliationStatus();
-        commonStatus.setError(null);
-        SPEC lastReconciledSpec = 
reconciliationStatus.deserializeLastReconciledSpec();
+        var reconciliationStatus = commonStatus.getReconciliationStatus();
+        var lastReconciledSpec = 
reconciliationStatus.deserializeLastReconciledSpec();
         lastReconciledSpec
                 .getJob()
                 
.setSavepointTriggerNonce(spec.getJob().getSavepointTriggerNonce());
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
index c4ca34e..6858281 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
@@ -19,7 +19,6 @@ package org.apache.flink.kubernetes.operator.utils;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
 import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
@@ -70,9 +69,6 @@ public class SavepointUtils {
                 resource.getStatus().getJobStatus().getSavepointInfo(),
                 conf);
 
-        if (triggerType == SavepointTriggerType.MANUAL) {
-            ReconciliationUtils.updateSavepointReconciliationSuccess(resource);
-        }
         return true;
     }
 
@@ -135,10 +131,12 @@ public class SavepointUtils {
         return Optional.empty();
     }
 
-    public static boolean gracePeriodEnded(
-            FlinkOperatorConfiguration configuration, SavepointInfo 
savepointInfo) {
-        var elapsed = System.currentTimeMillis() - 
savepointInfo.getTriggerTimestamp();
-        return elapsed > 
configuration.getSavepointTriggerGracePeriod().toMillis();
+    public static boolean gracePeriodEnded(Configuration conf, SavepointInfo 
savepointInfo) {
+        Duration gracePeriod =
+                
conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_TRIGGER_GRACE_PERIOD);
+        var endOfGracePeriod =
+                
Instant.ofEpochMilli(savepointInfo.getTriggerTimestamp()).plus(gracePeriod);
+        return endOfGracePeriod.isBefore(Instant.now());
     }
 
     public static void resetTriggerIfJobNotRunning(
@@ -148,6 +146,7 @@ public class SavepointUtils {
         if (!ReconciliationUtils.isJobRunning(status)
                 && SavepointUtils.savepointInProgress(jobStatus)) {
             var savepointInfo = jobStatus.getSavepointInfo();
+            
ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(savepointInfo, 
resource);
             savepointInfo.resetTrigger();
             LOG.error("Job is not running, cancelling savepoint operation");
             EventUtils.createOrUpdateEvent(
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java
index e3ff8d0..ef335b5 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java
@@ -53,7 +53,7 @@ public class SavepointObserverTest {
 
         Savepoint sp = new Savepoint(1, "sp1", SavepointTriggerType.MANUAL);
         spInfo.updateLastSavepoint(sp);
-        observer.updateSavepointHistory(spInfo, sp, 
configManager.getDefaultConfig());
+        observer.cleanupSavepointHistory(spInfo, sp, 
configManager.getDefaultConfig());
 
         Assertions.assertNotNull(spInfo.getSavepointHistory());
         Assertions.assertIterableEquals(
@@ -73,7 +73,7 @@ public class SavepointObserverTest {
 
         Savepoint sp1 = new Savepoint(1, "sp1", SavepointTriggerType.MANUAL);
         spInfo.updateLastSavepoint(sp1);
-        observer.updateSavepointHistory(spInfo, sp1, conf);
+        observer.cleanupSavepointHistory(spInfo, sp1, conf);
         Assertions.assertIterableEquals(
                 Collections.singletonList(sp1), spInfo.getSavepointHistory());
         Assertions.assertIterableEquals(
@@ -81,7 +81,7 @@ public class SavepointObserverTest {
 
         Savepoint sp2 = new Savepoint(2, "sp2", SavepointTriggerType.MANUAL);
         spInfo.updateLastSavepoint(sp2);
-        observer.updateSavepointHistory(spInfo, sp2, conf);
+        observer.cleanupSavepointHistory(spInfo, sp2, conf);
         Assertions.assertIterableEquals(
                 Collections.singletonList(sp2), spInfo.getSavepointHistory());
         Assertions.assertIterableEquals(
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
index 3963b97..104801c 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
@@ -39,6 +39,9 @@ import 
io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import org.junit.jupiter.api.Test;
 
+import java.time.Duration;
+import java.time.Instant;
+
 import static org.junit.Assert.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -185,7 +188,7 @@ public class ApplicationObserverTest {
         
assertTrue(SavepointUtils.savepointInProgress(deployment.getStatus().getJobStatus()));
 
         
deployment.getStatus().getJobStatus().getSavepointInfo().setTriggerId("unknown");
-        // savepoint error
+        // savepoint error within grace period
         assertEquals(
                 0,
                 kubernetesClient
@@ -197,6 +200,30 @@ public class ApplicationObserverTest {
                         .size());
         observer.observe(deployment, readyContext);
         
assertFalse(SavepointUtils.savepointInProgress(deployment.getStatus().getJobStatus()));
+        assertEquals(
+                0,
+                kubernetesClient
+                        .v1()
+                        .events()
+                        .inNamespace(deployment.getMetadata().getNamespace())
+                        .list()
+                        .getItems()
+                        .size());
+
+        
deployment.getStatus().getJobStatus().getSavepointInfo().setTriggerId("unknown");
+        deployment
+                .getStatus()
+                .getJobStatus()
+                .getSavepointInfo()
+                .setTriggerType(SavepointTriggerType.MANUAL);
+        deployment
+                .getStatus()
+                .getJobStatus()
+                .getSavepointInfo()
+                
.setTriggerTimestamp(Instant.now().minus(Duration.ofHours(1)).toEpochMilli());
+
+        observer.observe(deployment, readyContext);
+        
assertFalse(SavepointUtils.savepointInProgress(deployment.getStatus().getJobStatus()));
         assertEquals(
                 1,
                 kubernetesClient
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index f631141..81978fb 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -222,7 +222,6 @@ public class ApplicationReconcilerTest {
         verifyAndSetRunningJobsToStatus(deployment, runningJobs);
         
assertFalse(SavepointUtils.savepointInProgress(deployment.getStatus().getJobStatus()));
 
-        // trigger savepoint
         FlinkDeployment spDeployment = ReconciliationUtils.clone(deployment);
 
         // don't trigger if nonce is missing
@@ -235,6 +234,13 @@ public class ApplicationReconcilerTest {
                 .getJob()
                 
.setSavepointTriggerNonce(ThreadLocalRandom.current().nextLong());
         reconciler.reconcile(spDeployment, context);
+        assertNull(
+                spDeployment
+                        .getStatus()
+                        .getReconciliationStatus()
+                        .deserializeLastReconciledSpec()
+                        .getJob()
+                        .getSavepointTriggerNonce());
         assertEquals(
                 "trigger_0",
                 
spDeployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
@@ -250,12 +256,16 @@ public class ApplicationReconcilerTest {
                 
spDeployment.getStatus().getJobStatus().getSavepointInfo().getTriggerType());
 
         
spDeployment.getStatus().getJobStatus().getSavepointInfo().resetTrigger();
+        ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(
+                spDeployment.getStatus().getJobStatus().getSavepointInfo(), 
spDeployment);
 
         // don't trigger when nonce is the same
         reconciler.reconcile(spDeployment, context);
         
assertFalse(SavepointUtils.savepointInProgress(spDeployment.getStatus().getJobStatus()));
 
         
spDeployment.getStatus().getJobStatus().getSavepointInfo().resetTrigger();
+        ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(
+                spDeployment.getStatus().getJobStatus().getSavepointInfo(), 
spDeployment);
 
         // trigger when new nonce is defined
         spDeployment
@@ -270,7 +280,19 @@ public class ApplicationReconcilerTest {
                 SavepointTriggerType.MANUAL,
                 
spDeployment.getStatus().getJobStatus().getSavepointInfo().getTriggerType());
 
+        // re-trigger after reset
+        
spDeployment.getStatus().getJobStatus().getSavepointInfo().resetTrigger();
+        reconciler.reconcile(spDeployment, context);
+        assertEquals(
+                "trigger_2",
+                
spDeployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
+        assertEquals(
+                SavepointTriggerType.MANUAL,
+                
spDeployment.getStatus().getJobStatus().getSavepointInfo().getTriggerType());
+
         
spDeployment.getStatus().getJobStatus().getSavepointInfo().resetTrigger();
+        ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(
+                spDeployment.getStatus().getJobStatus().getSavepointInfo(), 
spDeployment);
 
         // don't trigger nonce is cleared
         spDeployment.getSpec().getJob().setSavepointTriggerNonce(null);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconcilerTest.java
index fd96602..8167292 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconcilerTest.java
@@ -41,6 +41,7 @@ import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -278,8 +279,7 @@ public class FlinkSessionJobReconcilerTest {
         
assertTrue(SavepointUtils.savepointInProgress(sp1SessionJob.getStatus().getJobStatus()));
 
         // the last reconcile nonce updated
-        assertEquals(
-                2L,
+        assertNull(
                 sp1SessionJob
                         .getStatus()
                         .getReconciliationStatus()
@@ -323,6 +323,8 @@ public class FlinkSessionJobReconcilerTest {
                         .getParallelism());
 
         
sp1SessionJob.getStatus().getJobStatus().getSavepointInfo().resetTrigger();
+        ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(
+                sp1SessionJob.getStatus().getJobStatus().getSavepointInfo(), 
sp1SessionJob);
 
         // running -> suspended
         reconciler.reconcile(sp1SessionJob, readyContext);
@@ -345,20 +347,19 @@ public class FlinkSessionJobReconcilerTest {
                 flinkService.listSessionJobs());
 
         
sp1SessionJob.getStatus().getJobStatus().getSavepointInfo().resetTrigger();
-
-        // don't trigger when nonce is the same
-        sp1SessionJob.getSpec().getJob().setSavepointTriggerNonce(2L);
-        reconciler.reconcile(sp1SessionJob, readyContext);
-        
assertFalse(SavepointUtils.savepointInProgress(sp1SessionJob.getStatus().getJobStatus()));
+        ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(
+                sp1SessionJob.getStatus().getJobStatus().getSavepointInfo(), 
sp1SessionJob);
 
         // trigger when new nonce is defined
-        sp1SessionJob.getSpec().getJob().setSavepointTriggerNonce(3L);
+        sp1SessionJob.getSpec().getJob().setSavepointTriggerNonce(4L);
         reconciler.reconcile(sp1SessionJob, readyContext);
         assertEquals(
                 "trigger_1",
                 
sp1SessionJob.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
 
         
sp1SessionJob.getStatus().getJobStatus().getSavepointInfo().resetTrigger();
+        ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(
+                sp1SessionJob.getStatus().getJobStatus().getSavepointInfo(), 
sp1SessionJob);
 
         // don't trigger when nonce is cleared
         sp1SessionJob.getSpec().getJob().setSavepointTriggerNonce(null);

Reply via email to