This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new fc1b901c [FLINK-30781] Add completed checkpoint check to cluster 
health check
fc1b901c is described below

commit fc1b901c42fcd838f12254e2b47dd471f5eef8b9
Author: Gabor Somogyi <[email protected]>
AuthorDate: Tue Jan 24 17:59:23 2023 +0100

    [FLINK-30781] Add completed checkpoint check to cluster health check
---
 .../content/docs/custom-resource/job-management.md |   5 +-
 .../shortcodes/generated/dynamic_section.html      |  12 ++
 .../kubernetes_operator_config_configuration.html  |  12 ++
 .../config/KubernetesOperatorConfigOptions.java    |  18 ++
 .../operator/health/ClusterHealthInfo.java         |  20 +-
 .../operator/observer/ClusterHealthEvaluator.java  | 170 +++++++++++-----
 .../operator/observer/ClusterHealthObserver.java   |  21 +-
 .../deployment/AbstractJobReconciler.java          |   4 +-
 .../deployment/ApplicationReconciler.java          |   9 +-
 .../controller/UnhealthyDeploymentRestartTest.java |  48 +++++
 .../operator/health/ClusterHealthInfoTest.java     |  22 +-
 .../observer/ClusterHealthEvaluatorTest.java       | 222 +++++++++++++++------
 .../deployment/ApplicationReconcilerTest.java      |   5 +-
 13 files changed, 435 insertions(+), 133 deletions(-)

diff --git a/docs/content/docs/custom-resource/job-management.md 
b/docs/content/docs/custom-resource/job-management.md
index 54013478..da0fc4aa 100644
--- a/docs/content/docs/custom-resource/job-management.md
+++ b/docs/content/docs/custom-resource/job-management.md
@@ -238,8 +238,11 @@ When Kubernetes HA is enabled, the operator can restart 
the Flink cluster deploy
 unhealthy. Unhealthy deployment restart can be turned on in the configuration 
by setting `kubernetes.operator.cluster.health-check.enabled` to `true` 
(default: `false`).  
 In order this feature to work one must enable [recovery of missing job 
deployments](#recovery-of-missing-job-deployments).
 
-At the moment deployment is considered unhealthy when Flink's restarts count 
reaches `kubernetes.operator.cluster.health-check.restarts.threshold` (default: 
`64`)
+At the moment deployment is considered unhealthy when:
+* Flink's restarts count reaches 
`kubernetes.operator.cluster.health-check.restarts.threshold` (default: `64`)
 within time window of 
`kubernetes.operator.cluster.health-check.restarts.window` (default: 2 minutes).
+* `cluster.health-check.checkpoint-progress.enabled` is turned on and Flink's 
successful checkpoints count is not
+changing within time window of within time window of 
`kubernetes.operator.cluster.health-check.checkpoint-progress.window` (default: 
5 minutes).
 
 ## Restart failed job deployments
 
diff --git a/docs/layouts/shortcodes/generated/dynamic_section.html 
b/docs/layouts/shortcodes/generated/dynamic_section.html
index 51e1b313..707024b6 100644
--- a/docs/layouts/shortcodes/generated/dynamic_section.html
+++ b/docs/layouts/shortcodes/generated/dynamic_section.html
@@ -8,6 +8,18 @@
         </tr>
     </thead>
     <tbody>
+        <tr>
+            
<td><h5>kubernetes.operator.cluster.health-check.checkpoint-progress.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to enable checkpoint progress health check for 
clusters.</td>
+        </tr>
+        <tr>
+            
<td><h5>kubernetes.operator.cluster.health-check.checkpoint-progress.window</h5></td>
+            <td style="word-wrap: break-word;">5 min</td>
+            <td>Duration</td>
+            <td>If no checkpoints are completed within the defined time 
window, the job is considered unhealthy. This must be bigger than checkpointing 
interval.</td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.cluster.health-check.enabled</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index 5e69a439..69be9ba9 100644
--- 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -8,6 +8,18 @@
         </tr>
     </thead>
     <tbody>
+        <tr>
+            
<td><h5>kubernetes.operator.cluster.health-check.checkpoint-progress.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to enable checkpoint progress health check for 
clusters.</td>
+        </tr>
+        <tr>
+            
<td><h5>kubernetes.operator.cluster.health-check.checkpoint-progress.window</h5></td>
+            <td style="word-wrap: break-word;">5 min</td>
+            <td>Duration</td>
+            <td>If no checkpoints are completed within the defined time 
window, the job is considered unhealthy. This must be bigger than checkpointing 
interval.</td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.cluster.health-check.enabled</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index ea769568..ea1c6f18 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -379,6 +379,24 @@ public class KubernetesOperatorConfigOptions {
                             "The threshold which is checked against job 
restart count within a configured window. "
                                     + "If the restart count is reaching the 
threshold then full cluster restart is initiated.");
 
+    @Documentation.Section(SECTION_DYNAMIC)
+    public static final ConfigOption<Boolean>
+            OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED =
+                    
operatorConfig("cluster.health-check.checkpoint-progress.enabled")
+                            .booleanType()
+                            .defaultValue(false)
+                            .withDescription(
+                                    "Whether to enable checkpoint progress 
health check for clusters.");
+
+    @Documentation.Section(SECTION_DYNAMIC)
+    public static final ConfigOption<Duration>
+            OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW =
+                    
operatorConfig("cluster.health-check.checkpoint-progress.window")
+                            .durationType()
+                            .defaultValue(Duration.ofMinutes(5))
+                            .withDescription(
+                                    "If no checkpoints are completed within 
the defined time window, the job is considered unhealthy. This must be bigger 
than checkpointing interval.");
+
     @Documentation.Section(SECTION_DYNAMIC)
     public static final ConfigOption<Boolean> OPERATOR_JOB_RESTART_FAILED =
             operatorConfig("job.restart.failed")
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/ClusterHealthInfo.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/ClusterHealthInfo.java
index bb76f862..8bfd1dec 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/ClusterHealthInfo.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/ClusterHealthInfo.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.AllArgsConstructor;
 import lombok.Data;
-import lombok.NoArgsConstructor;
 
 import java.time.Clock;
 
@@ -31,7 +30,6 @@ import java.time.Clock;
 @Experimental
 @Data
 @AllArgsConstructor
-@NoArgsConstructor
 public class ClusterHealthInfo {
     /** Millisecond timestamp of the last observed health information. */
     private long timeStamp;
@@ -39,15 +37,25 @@ public class ClusterHealthInfo {
     /** Number of restarts. */
     private int numRestarts;
 
+    /** Millisecond timestamp lastly evaluated the number of restarts. */
+    private long numRestartsEvaluationTimeStamp;
+
+    /** Number of successfully completed checkpoints. */
+    private int numCompletedCheckpoints;
+
+    /** Millisecond timestamp lastly increased the number of completed 
checkpoints. */
+    private long numCompletedCheckpointsIncreasedTimeStamp;
+
     /** Calculated field whether the cluster is healthy or not. */
     private boolean healthy;
 
-    public static ClusterHealthInfo of(int numRestarts) {
-        return of(Clock.systemDefaultZone(), numRestarts);
+    public ClusterHealthInfo() {
+        this(Clock.systemDefaultZone());
     }
 
-    public static ClusterHealthInfo of(Clock clock, int numRestarts) {
-        return new ClusterHealthInfo(clock.millis(), numRestarts, true);
+    public ClusterHealthInfo(Clock clock) {
+        timeStamp = clock.millis();
+        healthy = true;
     }
 
     public static boolean isValid(ClusterHealthInfo clusterHealthInfo) {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java
index 4f5f3dc6..6a39586f 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java
@@ -27,6 +27,8 @@ import java.time.Clock;
 import java.time.Duration;
 import java.util.Map;
 
+import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED;
+import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW;
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD;
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_WINDOW;
 
@@ -75,6 +77,10 @@ public class ClusterHealthEvaluator {
             var lastValidClusterHealthInfo = 
getLastValidClusterHealthInfo(clusterInfo);
             if (lastValidClusterHealthInfo == null) {
                 LOG.debug("No last valid health info, skipping health check");
+                observedClusterHealthInfo.setNumRestartsEvaluationTimeStamp(
+                        observedClusterHealthInfo.getTimeStamp());
+                
observedClusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(
+                        observedClusterHealthInfo.getTimeStamp());
                 setLastValidClusterHealthInfo(clusterInfo, 
observedClusterHealthInfo);
             } else if (observedClusterHealthInfo.getTimeStamp()
                     < lastValidClusterHealthInfo.getTimeStamp()) {
@@ -82,65 +88,127 @@ public class ClusterHealthEvaluator {
                         "Observed health info timestamp is less than the last 
valid health info timestamp, this indicates a bug...";
                 LOG.error(msg);
                 throw new IllegalStateException(msg);
-            } else if (observedClusterHealthInfo.getNumRestarts()
-                    < lastValidClusterHealthInfo.getNumRestarts()) {
-                LOG.debug(
-                        "Observed health info number of restarts is less than 
the last valid health info number of restarts, skipping health check");
-                setLastValidClusterHealthInfo(clusterInfo, 
observedClusterHealthInfo);
             } else {
-                boolean isHealthy = true;
-
                 LOG.debug("Valid health info exist, checking cluster health");
                 LOG.debug("Last valid health info: {}", 
lastValidClusterHealthInfo);
                 LOG.debug("Observed health info: {}", 
observedClusterHealthInfo);
 
-                var timestampDiffMs =
-                        observedClusterHealthInfo.getTimeStamp()
-                                - lastValidClusterHealthInfo.getTimeStamp();
-                LOG.debug(
-                        "Time difference between health infos: {}",
-                        Duration.ofMillis(timestampDiffMs));
-
-                var restartCheckWindow =
-                        
configuration.get(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_WINDOW);
-                var restartCheckWindowMs = restartCheckWindow.toMillis();
-                double countMultiplier = (double) restartCheckWindowMs / 
(double) timestampDiffMs;
-                // If the 2 health info timestamp difference is within the 
window then no
-                // scaling needed
-                if (countMultiplier > 1) {
-                    countMultiplier = 1;
-                }
-                long numRestarts =
-                        (long)
-                                ((double)
-                                                
(observedClusterHealthInfo.getNumRestarts()
-                                                        - 
lastValidClusterHealthInfo
-                                                                
.getNumRestarts())
-                                        * countMultiplier);
-                LOG.debug(
-                        "Calculated restart count for {} window: {}",
-                        restartCheckWindow,
-                        numRestarts);
-
-                if (lastValidClusterHealthInfo.getTimeStamp()
-                        < clock.millis() - restartCheckWindowMs) {
-                    LOG.debug("Last valid health info timestamp is outside of 
the window");
-                    setLastValidClusterHealthInfo(clusterInfo, 
observedClusterHealthInfo);
-                }
-
-                var restartThreshold =
-                        
configuration.get(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD);
-                if (numRestarts > restartThreshold) {
-                    LOG.info("Restart count hit threshold: {}", 
restartThreshold);
-                    setLastValidClusterHealthInfo(clusterInfo, 
observedClusterHealthInfo);
-                    isHealthy = false;
-                }
-
-                // Update the health flag
-                lastValidClusterHealthInfo = 
getLastValidClusterHealthInfo(clusterInfo);
+                boolean isHealthy =
+                        evaluateRestarts(
+                                        configuration,
+                                        clusterInfo,
+                                        lastValidClusterHealthInfo,
+                                        observedClusterHealthInfo)
+                                && evaluateCheckpoints(
+                                        configuration,
+                                        lastValidClusterHealthInfo,
+                                        observedClusterHealthInfo);
+
+                
lastValidClusterHealthInfo.setTimeStamp(observedClusterHealthInfo.getTimeStamp());
                 lastValidClusterHealthInfo.setHealthy(isHealthy);
                 setLastValidClusterHealthInfo(clusterInfo, 
lastValidClusterHealthInfo);
             }
         }
     }
+
+    private boolean evaluateRestarts(
+            Configuration configuration,
+            Map<String, String> clusterInfo,
+            ClusterHealthInfo lastValidClusterHealthInfo,
+            ClusterHealthInfo observedClusterHealthInfo) {
+
+        if (observedClusterHealthInfo.getNumRestarts()
+                < lastValidClusterHealthInfo.getNumRestarts()) {
+            LOG.debug(
+                    "Observed health info number of restarts is less than in 
the last valid health info, skipping health check");
+            
lastValidClusterHealthInfo.setNumRestarts(observedClusterHealthInfo.getNumRestarts());
+            lastValidClusterHealthInfo.setNumRestartsEvaluationTimeStamp(
+                    observedClusterHealthInfo.getTimeStamp());
+            return true;
+        }
+
+        var timestampDiffMs =
+                observedClusterHealthInfo.getTimeStamp()
+                        - 
lastValidClusterHealthInfo.getNumRestartsEvaluationTimeStamp();
+        LOG.debug("Time difference between health infos: {}", 
Duration.ofMillis(timestampDiffMs));
+
+        var restartCheckWindow = 
configuration.get(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_WINDOW);
+        var restartCheckWindowMs = restartCheckWindow.toMillis();
+        double countMultiplier = (double) restartCheckWindowMs / (double) 
timestampDiffMs;
+        // If the 2 health info timestamp difference is within the window then 
no
+        // scaling needed
+        if (countMultiplier > 1) {
+            countMultiplier = 1;
+        }
+        long numRestarts =
+                (long)
+                        ((double)
+                                        
(observedClusterHealthInfo.getNumRestarts()
+                                                - 
lastValidClusterHealthInfo.getNumRestarts())
+                                * countMultiplier);
+        LOG.debug("Calculated restart count for {} window: {}", 
restartCheckWindow, numRestarts);
+
+        var restartThreshold = 
configuration.get(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD);
+        boolean isHealthy = numRestarts <= restartThreshold;
+        if (!isHealthy) {
+            LOG.info("Restart count hit threshold: {}", restartThreshold);
+        }
+
+        if (lastValidClusterHealthInfo.getNumRestartsEvaluationTimeStamp()
+                < clock.millis() - restartCheckWindowMs) {
+            LOG.debug(
+                    "Last valid number of restarts evaluation timestamp is 
outside of the window");
+            
lastValidClusterHealthInfo.setNumRestarts(observedClusterHealthInfo.getNumRestarts());
+            lastValidClusterHealthInfo.setNumRestartsEvaluationTimeStamp(
+                    observedClusterHealthInfo.getTimeStamp());
+        }
+
+        return isHealthy;
+    }
+
+    private boolean evaluateCheckpoints(
+            Configuration configuration,
+            ClusterHealthInfo lastValidClusterHealthInfo,
+            ClusterHealthInfo observedClusterHealthInfo) {
+        if 
(!configuration.getBoolean(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED))
 {
+            return true;
+        }
+
+        if (observedClusterHealthInfo.getNumCompletedCheckpoints()
+                < lastValidClusterHealthInfo.getNumCompletedCheckpoints()) {
+            LOG.debug(
+                    "Observed health info number of completed checkpoints is 
less than in the last valid health info, skipping health check");
+            lastValidClusterHealthInfo.setNumCompletedCheckpoints(
+                    observedClusterHealthInfo.getNumCompletedCheckpoints());
+            
lastValidClusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(
+                    observedClusterHealthInfo.getTimeStamp());
+            return true;
+        }
+
+        var timestampDiffMs =
+                observedClusterHealthInfo.getTimeStamp()
+                        - 
lastValidClusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp();
+        LOG.debug("Time difference between health infos: {}", 
Duration.ofMillis(timestampDiffMs));
+
+        boolean isHealthy = true;
+        var completedCheckpointsCheckWindow =
+                
configuration.get(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW);
+        var completedCheckpointsCheckWindowMs = 
completedCheckpointsCheckWindow.toMillis();
+
+        if (observedClusterHealthInfo.getNumCompletedCheckpoints()
+                > lastValidClusterHealthInfo.getNumCompletedCheckpoints()) {
+            LOG.debug("Last valid number of completed checkpoints increased 
marking timestamp");
+            lastValidClusterHealthInfo.setNumCompletedCheckpoints(
+                    observedClusterHealthInfo.getNumCompletedCheckpoints());
+            
lastValidClusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(
+                    observedClusterHealthInfo.getTimeStamp());
+        } else if 
(lastValidClusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp()
+                        + completedCheckpointsCheckWindowMs
+                < clock.millis()) {
+            LOG.info("Cluster is not able to complete checkpoints");
+            isHealthy = false;
+        }
+
+        return isHealthy;
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthObserver.java
index 6f63cfd2..0dae9353 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthObserver.java
@@ -33,6 +33,8 @@ public class ClusterHealthObserver {
     private static final Logger LOG = 
LoggerFactory.getLogger(ClusterHealthObserver.class);
     private static final String FULL_RESTARTS_METRIC_NAME = "fullRestarts";
     private static final String NUM_RESTARTS_METRIC_NAME = "numRestarts";
+    private static final String NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME =
+            "numberOfCompletedCheckpoints";
     private final ClusterHealthEvaluator clusterHealthEvaluator;
 
     public ClusterHealthObserver() {
@@ -56,22 +58,23 @@ public class ClusterHealthObserver {
                             .getMetrics(
                                     ctx.getObserveConfig(),
                                     jobId,
-                                    List.of(FULL_RESTARTS_METRIC_NAME, 
NUM_RESTARTS_METRIC_NAME));
-            ClusterHealthInfo observedClusterHealthInfo;
+                                    List.of(
+                                            FULL_RESTARTS_METRIC_NAME,
+                                            NUM_RESTARTS_METRIC_NAME,
+                                            
NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME));
+            ClusterHealthInfo observedClusterHealthInfo = new 
ClusterHealthInfo();
             if (metrics.containsKey(NUM_RESTARTS_METRIC_NAME)) {
                 LOG.debug(NUM_RESTARTS_METRIC_NAME + " metric is used");
-                observedClusterHealthInfo =
-                        ClusterHealthInfo.of(
-                                
Integer.parseInt(metrics.get(NUM_RESTARTS_METRIC_NAME)));
+                observedClusterHealthInfo.setNumRestarts(
+                        
Integer.parseInt(metrics.get(NUM_RESTARTS_METRIC_NAME)));
             } else if (metrics.containsKey(FULL_RESTARTS_METRIC_NAME)) {
                 LOG.debug(
                         FULL_RESTARTS_METRIC_NAME
                                 + " metric is used because "
                                 + NUM_RESTARTS_METRIC_NAME
                                 + " is missing");
-                observedClusterHealthInfo =
-                        ClusterHealthInfo.of(
-                                
Integer.parseInt(metrics.get(FULL_RESTARTS_METRIC_NAME)));
+                observedClusterHealthInfo.setNumRestarts(
+                        
Integer.parseInt(metrics.get(FULL_RESTARTS_METRIC_NAME)));
             } else {
                 throw new IllegalStateException(
                         "No job restart metric found. Either "
@@ -80,6 +83,8 @@ public class ClusterHealthObserver {
                                 + NUM_RESTARTS_METRIC_NAME
                                 + "(new) must exist.");
             }
+            observedClusterHealthInfo.setNumCompletedCheckpoints(
+                    
Integer.parseInt(metrics.get(NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME)));
             LOG.debug("Observed cluster health: {}", 
observedClusterHealthInfo);
 
             clusterHealthEvaluator.evaluate(
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index e9c170ca..b9e2fa6d 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -249,7 +249,9 @@ public abstract class AbstractJobReconciler<
             throws Exception {
         LOG.info("Resubmitting Flink job...");
         SPEC specToRecover = 
ReconciliationUtils.getDeployedSpec(ctx.getResource());
-        specToRecover.getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+        if (requireHaMetadata) {
+            specToRecover.getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+        }
         restoreJob(ctx, specToRecover, ctx.getObserveConfig(), 
requireHaMetadata);
     }
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 13317e65..4db3302e 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -145,6 +145,10 @@ public class ApplicationReconciler
         var relatedResource = ctx.getResource();
         var status = relatedResource.getStatus();
         var flinkService = ctx.getFlinkService();
+
+        ClusterHealthEvaluator.removeLastValidClusterHealthInfo(
+                relatedResource.getStatus().getClusterInfo());
+
         if (savepoint.isPresent()) {
             deployConfig.set(SavepointConfigOptions.SAVEPOINT_PATH, 
savepoint.get());
         } else {
@@ -271,7 +275,10 @@ public class ApplicationReconciler
                         MSG_RESTART_UNHEALTHY);
                 cleanupAfterFailedJob(ctx);
             }
-            resubmitJob(ctx, true);
+            boolean requireHaMetadata =
+                    
ReconciliationUtils.getDeployedSpec(ctx.getResource()).getJob().getUpgradeMode()
+                            != UpgradeMode.STATELESS;
+            resubmitJob(ctx, requireHaMetadata);
             return true;
         }
 
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
index e5814a0a..e643c0d2 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
@@ -33,7 +33,12 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 
+import java.time.Duration;
+
+import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED;
+import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW;
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED;
+import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /** @link Unhealthy deployment restart tests */
@@ -42,6 +47,9 @@ public class UnhealthyDeploymentRestartTest {
 
     private static final String NUM_RESTARTS_METRIC_NAME = "numRestarts";
 
+    private static final String NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME =
+            "numberOfCompletedCheckpoints";
+
     private FlinkConfigManager configManager;
 
     private TestingFlinkService flinkService;
@@ -54,12 +62,18 @@ public class UnhealthyDeploymentRestartTest {
     public void setup() {
         var configuration = new Configuration();
         configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED, true);
+        configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD, 
64);
+        
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED, 
true);
+        
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW, 
Duration.ZERO);
         configManager = new FlinkConfigManager(configuration);
         flinkService = new TestingFlinkService(kubernetesClient);
         context = flinkService.getContext();
         testController =
                 new TestingFlinkDeploymentController(configManager, 
kubernetesClient, flinkService);
         
kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace();
+
+        flinkService.setMetricValue(NUM_RESTARTS_METRIC_NAME, "0");
+        
flinkService.setMetricValue(NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME, "1");
     }
 
     @ParameterizedTest
@@ -95,4 +109,38 @@ public class UnhealthyDeploymentRestartTest {
                 appCluster.getStatus().getJobManagerDeploymentStatus());
         assertEquals("RUNNING", 
appCluster.getStatus().getJobStatus().getState());
     }
+
+    @ParameterizedTest
+    
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersionsAndUpgradeModes")
+    public void verifyApplicationNoCompletedCheckpointsJmRecovery(
+            FlinkVersion flinkVersion, UpgradeMode upgradeMode) throws 
Exception {
+        FlinkDeployment appCluster = 
TestUtils.buildApplicationCluster(flinkVersion);
+        appCluster.getSpec().getJob().setUpgradeMode(upgradeMode);
+
+        // Start a healthy deployment
+        
flinkService.setMetricValue(NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME, "1");
+        testController.reconcile(appCluster, context);
+        testController.reconcile(appCluster, context);
+        testController.reconcile(appCluster, context);
+        assertEquals(
+                JobManagerDeploymentStatus.READY,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+        assertEquals("RUNNING", 
appCluster.getStatus().getJobStatus().getState());
+
+        // Make deployment unhealthy
+        
flinkService.setMetricValue(NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME, "1");
+        testController.reconcile(appCluster, context);
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYING,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+
+        // After restart the deployment is healthy again
+        
flinkService.setMetricValue(NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME, "2");
+        testController.reconcile(appCluster, context);
+        testController.reconcile(appCluster, context);
+        assertEquals(
+                JobManagerDeploymentStatus.READY,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+        assertEquals("RUNNING", 
appCluster.getStatus().getJobStatus().getState());
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/ClusterHealthInfoTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/ClusterHealthInfoTest.java
index 2b6707c0..77f8c009 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/ClusterHealthInfoTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/ClusterHealthInfoTest.java
@@ -32,19 +32,33 @@ class ClusterHealthInfoTest {
     @Test
     public void isValidShouldReturnFalseWhenTimestampIsZero() {
         var clock = Clock.fixed(ofEpochSecond(0), ZoneId.systemDefault());
-        assertFalse(ClusterHealthInfo.isValid(ClusterHealthInfo.of(clock, 0)));
+        assertFalse(ClusterHealthInfo.isValid(new ClusterHealthInfo(clock)));
     }
 
     @Test
     public void isValidShouldReturnTrueWhenTimestampIsNonzero() {
         var clock = Clock.fixed(ofEpochSecond(1), ZoneId.systemDefault());
-        assertTrue(ClusterHealthInfo.isValid(ClusterHealthInfo.of(clock, 0)));
+        assertTrue(ClusterHealthInfo.isValid(new ClusterHealthInfo(clock)));
+    }
+
+    @Test
+    public void deserializeWithOldVersionShouldDeserializeCorrectly() {
+        var clusterHealthInfoJson = 
"{\"timeStamp\":1,\"numRestarts\":2,\"healthy\":true}";
+        var clusterHealthInfoFromJson = 
ClusterHealthInfo.deserialize(clusterHealthInfoJson);
+        assertEquals(1, clusterHealthInfoFromJson.getTimeStamp());
+        assertEquals(2, clusterHealthInfoFromJson.getNumRestarts());
+        assertTrue(clusterHealthInfoFromJson.isHealthy());
     }
 
     @Test
     public void serializationRoundTrip() {
-        var clock = Clock.fixed(ofEpochSecond(123), ZoneId.systemDefault());
-        var clusterHealthInfo = ClusterHealthInfo.of(clock, 456);
+        var clock = Clock.fixed(ofEpochSecond(1), ZoneId.systemDefault());
+        var clusterHealthInfo = new ClusterHealthInfo(clock);
+        clusterHealthInfo.setNumRestarts(2);
+        clusterHealthInfo.setNumRestartsEvaluationTimeStamp(3);
+        clusterHealthInfo.setNumCompletedCheckpoints(4);
+        clusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(5);
+        clusterHealthInfo.setHealthy(false);
         var clusterHealthInfoJson = 
ClusterHealthInfo.serialize(clusterHealthInfo);
 
         var clusterHealthInfoFromJson = 
ClusterHealthInfo.deserialize(clusterHealthInfoJson);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java
index e6cf9a63..d7748fe0 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java
@@ -33,6 +33,8 @@ import java.util.Map;
 
 import static java.time.Instant.ofEpochMilli;
 import static java.time.Instant.ofEpochSecond;
+import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED;
+import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW;
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD;
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_WINDOW;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -47,7 +49,8 @@ class ClusterHealthEvaluatorTest {
     private Map<String, String> clusterInfo;
     private ClusterHealthEvaluator clusterHealthEvaluator;
     private final Instant invalidInstant = ofEpochMilli(0);
-    private final Instant validInstant = ofEpochMilli(1);
+    private final Instant validInstant1 = ofEpochSecond(120);
+    private final Instant validInstant2 = validInstant1.plus(2, 
ChronoUnit.MINUTES);
     private ClusterHealthInfo invalidClusterHealthInfo;
 
     @BeforeEach
@@ -56,11 +59,11 @@ class ClusterHealthEvaluatorTest {
 
         clusterInfo = new HashMap<>();
 
-        var now = Clock.fixed(ofEpochSecond(120), ZoneId.systemDefault());
-        clusterHealthEvaluator = new ClusterHealthEvaluator(now);
-
         var clock = Clock.fixed(invalidInstant, ZoneId.systemDefault());
-        invalidClusterHealthInfo = ClusterHealthInfo.of(clock, 0);
+        invalidClusterHealthInfo = new ClusterHealthInfo(clock);
+
+        var now = Clock.fixed(validInstant2, ZoneId.systemDefault());
+        clusterHealthEvaluator = new ClusterHealthEvaluator(now);
     }
 
     @Test
@@ -71,18 +74,14 @@ class ClusterHealthEvaluatorTest {
 
     @Test
     public void evaluateShouldSetLastStateWhenValidObserved() {
-        var observedClusterHealthInfo = createClusterHealthInfo(validInstant, 
0);
-
-        clusterHealthEvaluator.evaluate(configuration, clusterInfo, 
observedClusterHealthInfo);
-        assertEquals(
-                observedClusterHealthInfo,
-                
ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo));
+        var observedClusterHealthInfo = createClusterHealthInfo(validInstant1, 
0, 1);
+        setLastValidClusterHealthInfo(observedClusterHealthInfo);
     }
 
     @Test
     public void evaluateShouldThrowExceptionWhenObservedTimestampIsOld() {
-        var observedClusterHealthInfo1 = createClusterHealthInfo(validInstant, 
0);
-        var observedClusterHealthInfo2 = 
createClusterHealthInfo(validInstant.plusMillis(100), 0);
+        var observedClusterHealthInfo1 = 
createClusterHealthInfo(validInstant1, 0, 1);
+        var observedClusterHealthInfo2 = 
createClusterHealthInfo(validInstant2, 0, 1);
 
         ClusterHealthEvaluator.setLastValidClusterHealthInfo(
                 clusterInfo, observedClusterHealthInfo2);
@@ -94,103 +93,206 @@ class ClusterHealthEvaluatorTest {
     }
 
     @Test
-    public void evaluateShouldOverwriteLastStateWhenRestartCountIsLess() {
-        var observedClusterHealthInfo1 = createClusterHealthInfo(validInstant, 
1);
-        var observedClusterHealthInfo2 = createClusterHealthInfo(validInstant, 
0);
+    public void evaluateShouldOverwriteRestartCountWhenLess() {
+        var observedClusterHealthInfo1 = 
createClusterHealthInfo(validInstant1, 1, 1);
+        var observedClusterHealthInfo2 = 
createClusterHealthInfo(validInstant2, 0, 1);
 
-        clusterHealthEvaluator.evaluate(configuration, clusterInfo, 
observedClusterHealthInfo1);
-        assertEquals(
-                observedClusterHealthInfo1,
-                
ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo));
+        setLastValidClusterHealthInfo(observedClusterHealthInfo1);
         clusterHealthEvaluator.evaluate(configuration, clusterInfo, 
observedClusterHealthInfo2);
+        var lastValidClusterHealthInfo =
+                
ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo);
+        assertNotNull(lastValidClusterHealthInfo);
         assertEquals(
-                observedClusterHealthInfo2,
-                
ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo));
+                observedClusterHealthInfo2.getNumRestarts(),
+                lastValidClusterHealthInfo.getNumRestarts());
+        assertEquals(
+                observedClusterHealthInfo2.getTimeStamp(),
+                
lastValidClusterHealthInfo.getNumRestartsEvaluationTimeStamp());
     }
 
     @Test
-    public void evaluateShouldNotOverwriteLastStateWhenTimestampIsInWindow() {
+    public void 
evaluateShouldNotOverwriteRestartCountWhenTimestampIsInWindow() {
         configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_WINDOW, 
Duration.ofMinutes(2));
-        var observedClusterHealthInfo1 = createClusterHealthInfo(validInstant, 
0);
-        var observedClusterHealthInfo2 =
-                createClusterHealthInfo(validInstant.plus(1, 
ChronoUnit.MINUTES), 0);
+        var observedClusterHealthInfo1 = 
createClusterHealthInfo(validInstant1, 0, 1);
+        var observedClusterHealthInfo2 = 
createClusterHealthInfo(validInstant2, 1, 1);
 
-        ClusterHealthEvaluator.setLastValidClusterHealthInfo(
-                clusterInfo, observedClusterHealthInfo1);
+        setLastValidClusterHealthInfo(observedClusterHealthInfo1);
         clusterHealthEvaluator.evaluate(configuration, clusterInfo, 
observedClusterHealthInfo2);
+        var lastValidClusterHealthInfo =
+                
ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo);
+        assertNotNull(lastValidClusterHealthInfo);
         assertEquals(
-                observedClusterHealthInfo1,
-                
ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo));
+                observedClusterHealthInfo1.getNumRestarts(),
+                lastValidClusterHealthInfo.getNumRestarts());
+        assertEquals(
+                observedClusterHealthInfo1.getTimeStamp(),
+                
lastValidClusterHealthInfo.getNumRestartsEvaluationTimeStamp());
     }
 
     @Test
-    public void evaluateShouldOverwriteLastStateWhenTimestampIsOutOfWindow() {
+    public void 
evaluateShouldOverwriteRestartCountWhenTimestampIsOutOfWindow() {
         configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_WINDOW, 
Duration.ofMinutes(1));
-        var observedClusterHealthInfo1 = createClusterHealthInfo(validInstant, 
0);
-        var observedClusterHealthInfo2 =
-                createClusterHealthInfo(validInstant.plus(1, 
ChronoUnit.MINUTES), 0);
+        var observedClusterHealthInfo1 = 
createClusterHealthInfo(validInstant1, 0, 1);
+        var observedClusterHealthInfo2 = 
createClusterHealthInfo(validInstant2, 1, 1);
 
-        ClusterHealthEvaluator.setLastValidClusterHealthInfo(
-                clusterInfo, observedClusterHealthInfo1);
+        setLastValidClusterHealthInfo(observedClusterHealthInfo1);
         clusterHealthEvaluator.evaluate(configuration, clusterInfo, 
observedClusterHealthInfo2);
+        var lastValidClusterHealthInfo =
+                
ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo);
+        assertNotNull(lastValidClusterHealthInfo);
         assertEquals(
-                observedClusterHealthInfo2,
-                
ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo));
+                observedClusterHealthInfo2.getNumRestarts(),
+                lastValidClusterHealthInfo.getNumRestarts());
+        assertEquals(
+                observedClusterHealthInfo2.getTimeStamp(),
+                
lastValidClusterHealthInfo.getNumRestartsEvaluationTimeStamp());
+    }
+
+    @Test
+    public void evaluateShouldOverwriteCompletedCheckpointCountWhenLess() {
+        
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED, 
true);
+        var observedClusterHealthInfo1 = 
createClusterHealthInfo(validInstant1, 0, 1);
+        var observedClusterHealthInfo2 = 
createClusterHealthInfo(validInstant2, 0, 0);
+
+        setLastValidClusterHealthInfo(observedClusterHealthInfo1);
+        clusterHealthEvaluator.evaluate(configuration, clusterInfo, 
observedClusterHealthInfo2);
+        var lastValidClusterHealthInfo =
+                
ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo);
+        assertNotNull(lastValidClusterHealthInfo);
+        assertEquals(
+                observedClusterHealthInfo2.getNumCompletedCheckpoints(),
+                lastValidClusterHealthInfo.getNumCompletedCheckpoints());
+        assertEquals(
+                observedClusterHealthInfo2.getTimeStamp(),
+                
lastValidClusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp());
+    }
+
+    @Test
+    public void evaluateShouldOverwriteCompletedCheckpointWhenIncreased() {
+        
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED, 
true);
+        configuration.set(
+                OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW, 
Duration.ofMinutes(2));
+        var observedClusterHealthInfo1 = 
createClusterHealthInfo(validInstant1, 0, 1);
+        var observedClusterHealthInfo2 = 
createClusterHealthInfo(validInstant2, 0, 2);
+
+        setLastValidClusterHealthInfo(observedClusterHealthInfo1);
+        clusterHealthEvaluator.evaluate(configuration, clusterInfo, 
observedClusterHealthInfo2);
+        var lastValidClusterHealthInfo =
+                
ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo);
+        assertNotNull(lastValidClusterHealthInfo);
+        assertEquals(
+                observedClusterHealthInfo2.getNumCompletedCheckpoints(),
+                lastValidClusterHealthInfo.getNumCompletedCheckpoints());
+        assertEquals(
+                observedClusterHealthInfo2.getTimeStamp(),
+                
lastValidClusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp());
+    }
+
+    @Test
+    public void 
evaluateShouldNotOverwriteCompletedCheckpointWhenNotIncreased() {
+        
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED, 
true);
+        configuration.set(
+                OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW, 
Duration.ofMinutes(2));
+        var observedClusterHealthInfo1 = 
createClusterHealthInfo(validInstant1, 0, 1);
+        var observedClusterHealthInfo2 = 
createClusterHealthInfo(validInstant2, 0, 1);
+
+        setLastValidClusterHealthInfo(observedClusterHealthInfo1);
+        clusterHealthEvaluator.evaluate(configuration, clusterInfo, 
observedClusterHealthInfo2);
+        var lastValidClusterHealthInfo =
+                
ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo);
+        assertNotNull(lastValidClusterHealthInfo);
+        assertEquals(
+                observedClusterHealthInfo1.getNumCompletedCheckpoints(),
+                lastValidClusterHealthInfo.getNumCompletedCheckpoints());
+        assertEquals(
+                observedClusterHealthInfo1.getTimeStamp(),
+                
lastValidClusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp());
     }
 
     @Test
     public void evaluateShouldMarkClusterHealthyWhenNoPreviousState() {
-        var observedClusterHealthInfo = createClusterHealthInfo(validInstant, 
1);
+        var observedClusterHealthInfo = createClusterHealthInfo(validInstant1, 
1, 1);
 
         clusterHealthEvaluator.evaluate(configuration, clusterInfo, 
observedClusterHealthInfo);
         assertClusterHealthIs(true);
     }
 
     @Test
-    public void evaluateShouldMarkClusterHealthyWhenThresholdNotHit() {
+    public void evaluateShouldMarkClusterHealthyWhenRestartThresholdNotHit() {
         configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_WINDOW, 
Duration.ofMinutes(5));
         configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD, 
100);
-        ClusterHealthInfo observedClusterHealthInfo1 = 
createClusterHealthInfo(validInstant, 0);
-        var observedClusterHealthInfo2 =
-                createClusterHealthInfo(validInstant.plus(1, 
ChronoUnit.MINUTES), 100);
+        ClusterHealthInfo observedClusterHealthInfo1 = 
createClusterHealthInfo(validInstant1, 0, 1);
+        var observedClusterHealthInfo2 = 
createClusterHealthInfo(validInstant2, 100, 1);
 
-        ClusterHealthEvaluator.setLastValidClusterHealthInfo(
-                clusterInfo, observedClusterHealthInfo1);
+        setLastValidClusterHealthInfo(observedClusterHealthInfo1);
         clusterHealthEvaluator.evaluate(configuration, clusterInfo, 
observedClusterHealthInfo2);
         assertClusterHealthIs(true);
     }
 
     @Test
-    public void 
evaluateShouldMarkClusterUnhealthyWhenThresholdHitImmediately() {
+    public void 
evaluateShouldMarkClusterUnhealthyWhenRestartThresholdHitImmediately() {
         configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_WINDOW, 
Duration.ofMinutes(5));
         configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD, 
100);
-        ClusterHealthInfo observedClusterHealthInfo1 = 
createClusterHealthInfo(validInstant, 0);
-        var observedClusterHealthInfo2 =
-                createClusterHealthInfo(validInstant.plus(1, 
ChronoUnit.MINUTES), 101);
+        ClusterHealthInfo observedClusterHealthInfo1 = 
createClusterHealthInfo(validInstant1, 0, 1);
+        var observedClusterHealthInfo2 = 
createClusterHealthInfo(validInstant2, 101, 1);
 
-        ClusterHealthEvaluator.setLastValidClusterHealthInfo(
-                clusterInfo, observedClusterHealthInfo1);
+        setLastValidClusterHealthInfo(observedClusterHealthInfo1);
         clusterHealthEvaluator.evaluate(configuration, clusterInfo, 
observedClusterHealthInfo2);
         assertClusterHealthIs(false);
     }
 
     @Test
-    public void evaluateShouldMarkClusterUnhealthyWhenThresholdHitInAverage() {
-        configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_WINDOW, 
Duration.ofMinutes(5));
+    public void 
evaluateShouldMarkClusterUnhealthyWhenRestartThresholdHitInAverage() {
+        configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_WINDOW, 
Duration.ofMinutes(1));
         configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD, 
100);
-        var observedClusterHealthInfo1 = createClusterHealthInfo(validInstant, 
0);
-        var observedClusterHealthInfo2 =
-                createClusterHealthInfo(validInstant.plus(6, 
ChronoUnit.MINUTES), 122);
+        var observedClusterHealthInfo1 = 
createClusterHealthInfo(validInstant1, 0, 1);
+        var observedClusterHealthInfo2 = 
createClusterHealthInfo(validInstant2, 500, 1);
 
-        ClusterHealthEvaluator.setLastValidClusterHealthInfo(
-                clusterInfo, observedClusterHealthInfo1);
+        setLastValidClusterHealthInfo(observedClusterHealthInfo1);
+        clusterHealthEvaluator.evaluate(configuration, clusterInfo, 
observedClusterHealthInfo2);
+        assertClusterHealthIs(false);
+    }
+
+    @Test
+    public void 
evaluateShouldMarkClusterHealthyWhenNoCompletedCheckpointsInsideWindow() {
+        
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED, 
true);
+        configuration.set(
+                OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW, 
Duration.ofMinutes(3));
+        var observedClusterHealthInfo1 = 
createClusterHealthInfo(validInstant1, 0, 0);
+        var observedClusterHealthInfo2 = 
createClusterHealthInfo(validInstant2, 0, 0);
+
+        setLastValidClusterHealthInfo(observedClusterHealthInfo1);
+        clusterHealthEvaluator.evaluate(configuration, clusterInfo, 
observedClusterHealthInfo2);
+        assertClusterHealthIs(true);
+    }
+
+    @Test
+    public void 
evaluateShouldMarkClusterUnhealthyWhenNoCompletedCheckpointsOutsideWindow() {
+        
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED, 
true);
+        configuration.set(
+                OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW, 
Duration.ofMinutes(1));
+        var observedClusterHealthInfo1 = 
createClusterHealthInfo(validInstant1, 0, 0);
+        var observedClusterHealthInfo2 = 
createClusterHealthInfo(validInstant2, 0, 0);
+
+        setLastValidClusterHealthInfo(observedClusterHealthInfo1);
         clusterHealthEvaluator.evaluate(configuration, clusterInfo, 
observedClusterHealthInfo2);
         assertClusterHealthIs(false);
     }
 
-    private ClusterHealthInfo createClusterHealthInfo(Instant instant, int 
numRestarts) {
+    private ClusterHealthInfo createClusterHealthInfo(
+            Instant instant, int numRestarts, int numCompletedCheckpoints) {
         var clock = Clock.fixed(instant, ZoneId.systemDefault());
-        return ClusterHealthInfo.of(clock, numRestarts);
+        var clusterHealthInfo = new ClusterHealthInfo(clock);
+        clusterHealthInfo.setNumRestarts(numRestarts);
+        clusterHealthInfo.setNumCompletedCheckpoints(numCompletedCheckpoints);
+        return clusterHealthInfo;
+    }
+
+    private void setLastValidClusterHealthInfo(ClusterHealthInfo 
clusterHealthInfo) {
+        clusterHealthEvaluator.evaluate(configuration, clusterInfo, 
clusterHealthInfo);
+        assertEquals(
+                clusterHealthInfo,
+                
ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo));
     }
 
     private void assertClusterHealthIs(boolean healthy) {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 4ca8618d..063986eb 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -691,7 +691,10 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
         Assertions.assertEquals(MSG_SUBMIT, 
eventCollector.events.remove().getMessage());
         verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
 
-        var clusterHealthInfo = new 
ClusterHealthInfo(System.currentTimeMillis(), 2, false);
+        var clusterHealthInfo = new ClusterHealthInfo();
+        clusterHealthInfo.setTimeStamp(System.currentTimeMillis());
+        clusterHealthInfo.setNumRestarts(2);
+        clusterHealthInfo.setHealthy(false);
         ClusterHealthEvaluator.setLastValidClusterHealthInfo(
                 deployment.getStatus().getClusterInfo(), clusterHealthInfo);
         reconciler.reconcile(deployment, context);

Reply via email to