This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new cb90b10c [FLINK-35126] Rework default checkpoint progress check window
cb90b10c is described below

commit cb90b10c93241f9994acdd11d59ce1ec25ccbc47
Author: Gyula Fora <[email protected]>
AuthorDate: Mon May 6 11:51:33 2024 +0200

    [FLINK-35126] Rework default checkpoint progress check window
---
 .../shortcodes/generated/dynamic_section.html      |  6 +--
 .../kubernetes_operator_config_configuration.html  |  6 +--
 .../config/KubernetesOperatorConfigOptions.java    |  6 +--
 .../operator/observer/ClusterHealthEvaluator.java  | 50 ++++++++++++++--------
 .../controller/UnhealthyDeploymentRestartTest.java |  2 +-
 .../observer/ClusterHealthEvaluatorTest.java       | 34 ++++++++++-----
 6 files changed, 66 insertions(+), 38 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/dynamic_section.html 
b/docs/layouts/shortcodes/generated/dynamic_section.html
index 9c1083fc..be963815 100644
--- a/docs/layouts/shortcodes/generated/dynamic_section.html
+++ b/docs/layouts/shortcodes/generated/dynamic_section.html
@@ -22,15 +22,15 @@
         </tr>
         <tr>
             
<td><h5>kubernetes.operator.cluster.health-check.checkpoint-progress.enabled</h5></td>
-            <td style="word-wrap: break-word;">false</td>
+            <td style="word-wrap: break-word;">true</td>
             <td>Boolean</td>
             <td>Whether to enable checkpoint progress health check for 
clusters.</td>
         </tr>
         <tr>
             
<td><h5>kubernetes.operator.cluster.health-check.checkpoint-progress.window</h5></td>
-            <td style="word-wrap: break-word;">5 min</td>
+            <td style="word-wrap: break-word;">(none)</td>
             <td>Duration</td>
-            <td>If no checkpoints are completed within the defined time 
window, the job is considered unhealthy. This must be bigger than checkpointing 
interval.</td>
+            <td>If no checkpoints are completed within the defined time 
window, the job is considered unhealthy. The minimum window size is 
`max(checkpointingInterval, checkpointTimeout) * (tolerableCheckpointFailures + 
2)`, which also serves as the default value when checkpointing is enabled. For 
example with checkpoint interval 10 minutes and 0 tolerable failures, the 
default progress check window will be 20 minutes.</td>
         </tr>
         <tr>
             <td><h5>kubernetes.operator.cluster.health-check.enabled</h5></td>
diff --git 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index 172029c8..218e2626 100644
--- 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -22,15 +22,15 @@
         </tr>
         <tr>
             
<td><h5>kubernetes.operator.cluster.health-check.checkpoint-progress.enabled</h5></td>
-            <td style="word-wrap: break-word;">false</td>
+            <td style="word-wrap: break-word;">true</td>
             <td>Boolean</td>
             <td>Whether to enable checkpoint progress health check for 
clusters.</td>
         </tr>
         <tr>
             
<td><h5>kubernetes.operator.cluster.health-check.checkpoint-progress.window</h5></td>
-            <td style="word-wrap: break-word;">5 min</td>
+            <td style="word-wrap: break-word;">(none)</td>
             <td>Duration</td>
-            <td>If no checkpoints are completed within the defined time 
window, the job is considered unhealthy. This must be bigger than checkpointing 
interval.</td>
+            <td>If no checkpoints are completed within the defined time 
window, the job is considered unhealthy. The minimum window size is 
`max(checkpointingInterval, checkpointTimeout) * (tolerableCheckpointFailures + 
2)`, which also serves as the default value when checkpointing is enabled. For 
example with checkpoint interval 10 minutes and 0 tolerable failures, the 
default progress check window will be 20 minutes.</td>
         </tr>
         <tr>
             <td><h5>kubernetes.operator.cluster.health-check.enabled</h5></td>
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index b065d228..a9e6d322 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -502,7 +502,7 @@ public class KubernetesOperatorConfigOptions {
             OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED =
                     
operatorConfig("cluster.health-check.checkpoint-progress.enabled")
                             .booleanType()
-                            .defaultValue(false)
+                            .defaultValue(true)
                             .withDescription(
                                     "Whether to enable checkpoint progress 
health check for clusters.");
 
@@ -511,9 +511,9 @@ public class KubernetesOperatorConfigOptions {
             OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW =
                     
operatorConfig("cluster.health-check.checkpoint-progress.window")
                             .durationType()
-                            .defaultValue(Duration.ofMinutes(5))
+                            .noDefaultValue()
                             .withDescription(
-                                    "If no checkpoints are completed within 
the defined time window, the job is considered unhealthy. This must be bigger 
than checkpointing interval.");
+                                    "If no checkpoints are completed within 
the defined time window, the job is considered unhealthy. The minimum window 
size is `max(checkpointingInterval, checkpointTimeout) * 
(tolerableCheckpointFailures + 2)`, which also serves as the default value when 
checkpointing is enabled. For example with checkpoint interval 10 minutes and 0 
tolerable failures, the default progress check window will be 20 minutes.");
 
     @Documentation.Section(SECTION_DYNAMIC)
     public static final ConfigOption<Boolean> OPERATOR_JOB_RESTART_FAILED =
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java
index c84f3413..d38a8f0f 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java
@@ -178,30 +178,46 @@ public class ClusterHealthEvaluator {
             return true;
         }
 
-        var completedCheckpointsCheckWindow =
-                
configuration.get(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW);
+        var windowOpt =
+                
configuration.getOptional(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW);
 
         CheckpointConfig checkpointConfig = new CheckpointConfig();
         checkpointConfig.configure(configuration);
         var checkpointingInterval = checkpointConfig.getCheckpointInterval();
         var checkpointingTimeout = checkpointConfig.getCheckpointTimeout();
-        var tolerationFailureNumber = 
checkpointConfig.getTolerableCheckpointFailureNumber() + 1;
-        var minCompletedCheckpointsCheckWindow =
-                Math.max(
-                        checkpointingInterval * tolerationFailureNumber,
-                        checkpointingTimeout * tolerationFailureNumber);
-        if (completedCheckpointsCheckWindow.toMillis() < 
minCompletedCheckpointsCheckWindow) {
-            LOG.warn(
-                    "{} is not long enough. Default to max({} * {}, {} * {}): 
{}ms",
-                    
OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW.key(),
-                    CHECKPOINTING_INTERVAL.key(),
-                    TOLERABLE_FAILURE_NUMBER.key(),
-                    CHECKPOINTING_TIMEOUT.key(),
-                    TOLERABLE_FAILURE_NUMBER.key(),
-                    minCompletedCheckpointsCheckWindow);
-            completedCheckpointsCheckWindow = 
Duration.ofMillis(minCompletedCheckpointsCheckWindow);
+        var tolerationFailureNumber = 
checkpointConfig.getTolerableCheckpointFailureNumber() + 2;
+        var minCheckWindow =
+                Duration.ofMillis(
+                        Math.max(
+                                checkpointingInterval * 
tolerationFailureNumber,
+                                checkpointingTimeout * 
tolerationFailureNumber));
+
+        if (windowOpt.isEmpty() && !checkpointConfig.isCheckpointingEnabled()) 
{
+            // If no explicit checkpoint check window is specified and 
checkpointing is disabled
+            // based on the config, we don't do anything
+            return true;
         }
 
+        var completedCheckpointsCheckWindow =
+                windowOpt
+                        .filter(
+                                d -> {
+                                    if (d.compareTo(minCheckWindow) < 0) {
+                                        LOG.debug(
+                                                "{} is not long enough. 
Default to max({} * {}, {} * {}): {}",
+                                                
OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW
+                                                        .key(),
+                                                CHECKPOINTING_INTERVAL.key(),
+                                                TOLERABLE_FAILURE_NUMBER.key(),
+                                                CHECKPOINTING_TIMEOUT.key(),
+                                                TOLERABLE_FAILURE_NUMBER.key(),
+                                                minCheckWindow);
+                                        return false;
+                                    }
+                                    return true;
+                                })
+                        .orElse(minCheckWindow);
+
         if (observedClusterHealthInfo.getNumCompletedCheckpoints()
                 < lastValidClusterHealthInfo.getNumCompletedCheckpoints()) {
             LOG.debug(
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
index 050db7d4..c7b26d1b 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
@@ -138,7 +138,7 @@ public class UnhealthyDeploymentRestartTest {
         // Ensure the last savepoint has been taken more than 10 minutes ago 
(Default checkpoint
         // interval)
         clusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(
-                
clusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp() - 600000);
+                
clusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp() - 1200000);
         setLastValidClusterHealthInfo(appCluster.getStatus().getClusterInfo(), 
clusterHealthInfo);
         testController.getStatusRecorder().patchAndCacheStatus(appCluster, 
kubernetesClient);
         testController.reconcile(appCluster, context);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java
index 9d58d71b..83a0e751 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java
@@ -159,6 +159,8 @@ class ClusterHealthEvaluatorTest {
     @Test
     public void evaluateShouldOverwriteCompletedCheckpointCountWhenLess() {
         
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED, 
true);
+        configuration.set(
+                OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW, 
Duration.ofMinutes(5));
         var observedClusterHealthInfo1 = 
createClusterHealthInfo(validInstant1, 0, 1);
         var observedClusterHealthInfo2 = 
createClusterHealthInfo(validInstant2, 0, 0);
 
@@ -278,25 +280,33 @@ class ClusterHealthEvaluatorTest {
         Instant tenSecInstant = ofEpochSecond(10);
         Instant twoMinInstant = ofEpochSecond(120);
         Instant fourMinInstant = twoMinInstant.plus(2, ChronoUnit.MINUTES);
+
+        Duration oneMin = Duration.ofMinutes(1);
         return Stream.of(
                 // 
ShouldMarkClusterUnhealthyWhenNoCompletedCheckpointsOutsideWindow
-                Arguments.of(twoMinInstant, fourMinInstant, 30L, 30L, null, 
false),
+                Arguments.of(twoMinInstant, fourMinInstant, oneMin, 30L, 30L, 
null, false),
+                // Verify checkpoint progress even if checkpointing not 
configured
+                Arguments.of(twoMinInstant, fourMinInstant, oneMin, null, 30L, 
null, false),
+                // Verify default window if not explicitly configured
+                Arguments.of(twoMinInstant, fourMinInstant, null, 30L, 30L, 
null, false),
+                // Verify check is off if both window and checkpointing is not 
configured
+                Arguments.of(twoMinInstant, fourMinInstant, null, null, 30L, 
null, true),
                 // 
ShouldMarkClusterHealthyWhenCompletedCheckpointsWithOutsideWindowFromCheckpointInterval
-                Arguments.of(twoMinInstant, fourMinInstant, 120L, 30L, null, 
true),
+                Arguments.of(twoMinInstant, fourMinInstant, oneMin, 60L, 30L, 
null, true),
                 // 
ShouldMarkClusterUnhealthyWhenNoCompletedCheckpointsWithOutsideWindowFromCheckpointInterval
-                Arguments.of(tenSecInstant, fourMinInstant, 120L, 30L, null, 
false),
+                Arguments.of(tenSecInstant, fourMinInstant, oneMin, 60L, 30L, 
null, false),
                 // 
ShouldMarkClusterHealthyWhenCompletedCheckpointsWithOutsideWindowFromCheckpointIntervalTimesNbTolerableFailure
-                Arguments.of(twoMinInstant, fourMinInstant, 30L, 10L, 3, true),
+                Arguments.of(twoMinInstant, fourMinInstant, oneMin, 30L, 10L, 
3, true),
                 // 
ShouldMarkClusterHealthyWhenNoCompletedCheckpointsWithOutsideWindowFromCheckpointIntervalTimesNbTolerableFailure
-                Arguments.of(tenSecInstant, fourMinInstant, 30L, 10L, 3, 
false),
+                Arguments.of(tenSecInstant, fourMinInstant, oneMin, 30L, 10L, 
3, false),
                 // 
ShouldMarkClusterHealthyWhenCompletedCheckpointsWithOutsideWindowFromCheckpointingTimeout
-                Arguments.of(twoMinInstant, fourMinInstant, 30L, 120L, null, 
true),
+                Arguments.of(twoMinInstant, fourMinInstant, oneMin, 30L, 60L, 
null, true),
                 // 
ShouldMarkClusterHealthyWhenNoCompletedCheckpointsWithOutsideWindowFromCheckpointingTimeout
-                Arguments.of(tenSecInstant, fourMinInstant, 30L, 120L, null, 
false),
+                Arguments.of(tenSecInstant, fourMinInstant, oneMin, 30L, 60L, 
null, false),
                 // 
ShouldMarkClusterHealthyWhenCompletedCheckpointsWithOutsideWindowFromCheckpointingTimeoutTimesNbTolerableFailure
-                Arguments.of(twoMinInstant, fourMinInstant, 10L, 30L, 3, true),
+                Arguments.of(twoMinInstant, fourMinInstant, oneMin, 10L, 30L, 
3, true),
                 // 
ShouldMarkClusterHealthyWhenNoCompletedCheckpointsWithOutsideWindowFromCheckpointingTimeoutTimesNbTolerableFailure
-                Arguments.of(tenSecInstant, fourMinInstant, 10L, 30L, 3, 
false));
+                Arguments.of(tenSecInstant, fourMinInstant, oneMin, 10L, 30L, 
3, false));
     }
 
     @ParameterizedTest
@@ -304,13 +314,15 @@ class ClusterHealthEvaluatorTest {
     public void evaluateCheckpointing(
             Instant validInstant1,
             Instant validInstant2,
+            Duration window,
             Long checkpointingInterval,
             long checkpointingTimeout,
             Integer tolerationFailureNumber,
             boolean expectedIsHealthy) {
         
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED, 
true);
-        configuration.set(
-                OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW, 
Duration.ofMinutes(1));
+        if (window != null) {
+            
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW, 
window);
+        }
         if (checkpointingInterval != null) {
             configuration.set(CHECKPOINTING_INTERVAL, 
Duration.ofSeconds(checkpointingInterval));
         }

Reply via email to