This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 775bc5f4 [FLINK-34131] Ensure 
cluster.health-check.checkpoint-progress.window is well configure
775bc5f4 is described below

commit 775bc5f41df09accefca6590112509c6023b2fb4
Author: [email protected] <[email protected]>
AuthorDate: Thu Jan 18 09:41:47 2024 +0100

    [FLINK-34131] Ensure cluster.health-check.checkpoint-progress.window is 
well configure
    
    cluster.health-check.checkpoint-progress.window must be greater than
    max(execution.checkpointing.interval * 
execution.checkpointing.tolerable-failed-checkpoints,
    execution.checkpointing.timeout * 
execution.checkpointing.tolerable-failed-checkpoints)
    
    If it is below it can leads to unwanted restart of the flink cluster
---
 .../operator/api/utils/BaseTestUtils.java          |  4 +-
 .../operator/observer/ClusterHealthEvaluator.java  | 30 +++++++++++-
 .../TestingFlinkDeploymentController.java          |  2 +-
 .../controller/UnhealthyDeploymentRestartTest.java | 11 +++++
 .../observer/ClusterHealthEvaluatorTest.java       | 53 ++++++++++++++++++++--
 5 files changed, 93 insertions(+), 7 deletions(-)

diff --git 
a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
 
b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
index d3217a93..89724023 100644
--- 
a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
+++ 
b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
@@ -170,7 +170,9 @@ public class BaseTestUtils {
                 KubernetesHaServicesFactory.class.getCanonicalName());
         conf.put(HighAvailabilityOptions.HA_STORAGE_PATH.key(), "test");
         conf.put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), 
"test-savepoint-dir");
-        conf.put(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key(), 
"test-checkpoint-dir");
+        conf.put(
+                CheckpointingOptions.CHECKPOINTS_DIRECTORY.key(),
+                "file:///test/test-checkpoint-dir");
 
         return FlinkDeploymentSpec.builder()
                 .image(IMAGE)
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java
index 6a39586f..c84f3413 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.observer;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.health.ClusterHealthInfo;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,6 +32,9 @@ import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConf
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW;
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD;
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_WINDOW;
+import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
+import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT;
+import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.TOLERABLE_FAILURE_NUMBER;
 
 /** Evaluates whether the cluster is healthy. */
 public class ClusterHealthEvaluator {
@@ -174,6 +178,30 @@ public class ClusterHealthEvaluator {
             return true;
         }
 
+        var completedCheckpointsCheckWindow =
+                
configuration.get(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW);
+
+        CheckpointConfig checkpointConfig = new CheckpointConfig();
+        checkpointConfig.configure(configuration);
+        var checkpointingInterval = checkpointConfig.getCheckpointInterval();
+        var checkpointingTimeout = checkpointConfig.getCheckpointTimeout();
+        var tolerationFailureNumber = 
checkpointConfig.getTolerableCheckpointFailureNumber() + 1;
+        var minCompletedCheckpointsCheckWindow =
+                Math.max(
+                        checkpointingInterval * tolerationFailureNumber,
+                        checkpointingTimeout * tolerationFailureNumber);
+        if (completedCheckpointsCheckWindow.toMillis() < 
minCompletedCheckpointsCheckWindow) {
+            LOG.warn(
+                    "{} is not long enough. Default to max({} * {}, {} * {}): 
{}ms",
+                    
OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW.key(),
+                    CHECKPOINTING_INTERVAL.key(),
+                    TOLERABLE_FAILURE_NUMBER.key(),
+                    CHECKPOINTING_TIMEOUT.key(),
+                    TOLERABLE_FAILURE_NUMBER.key(),
+                    minCompletedCheckpointsCheckWindow);
+            completedCheckpointsCheckWindow = 
Duration.ofMillis(minCompletedCheckpointsCheckWindow);
+        }
+
         if (observedClusterHealthInfo.getNumCompletedCheckpoints()
                 < lastValidClusterHealthInfo.getNumCompletedCheckpoints()) {
             LOG.debug(
@@ -191,8 +219,6 @@ public class ClusterHealthEvaluator {
         LOG.debug("Time difference between health infos: {}", 
Duration.ofMillis(timestampDiffMs));
 
         boolean isHealthy = true;
-        var completedCheckpointsCheckWindow =
-                
configuration.get(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW);
         var completedCheckpointsCheckWindowMs = 
completedCheckpointsCheckWindow.toMillis();
 
         if (observedClusterHealthInfo.getNumCompletedCheckpoints()
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
index 8056a0ee..8d435a61 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
@@ -75,7 +75,7 @@ public class TestingFlinkDeploymentController
 
     @Getter private TestingFlinkResourceContextFactory contextFactory;
 
-    private StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> 
statusRecorder;
+    @Getter private StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> 
statusRecorder;
     @Getter private CanaryResourceManager<FlinkDeployment> 
canaryResourceManager;
 
     private Map<ResourceID, Tuple2<FlinkDeploymentSpec, Long>> 
currentGenerations = new HashMap<>();
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
index 5cdb9781..050db7d4 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
 import 
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.health.ClusterHealthInfo;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
@@ -39,6 +40,8 @@ import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConf
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW;
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED;
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD;
+import static 
org.apache.flink.kubernetes.operator.observer.ClusterHealthEvaluator.getLastValidClusterHealthInfo;
+import static 
org.apache.flink.kubernetes.operator.observer.ClusterHealthEvaluator.setLastValidClusterHealthInfo;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /**
@@ -130,6 +133,14 @@ public class UnhealthyDeploymentRestartTest {
 
         // Make deployment unhealthy
         
flinkService.setMetricValue(NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME, "1");
+        ClusterHealthInfo clusterHealthInfo =
+                
getLastValidClusterHealthInfo(appCluster.getStatus().getClusterInfo());
+        // Ensure the last savepoint has been taken more than 10 minutes ago 
(Default checkpoint
+        // interval)
+        clusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(
+                
clusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp() - 600000);
+        setLastValidClusterHealthInfo(appCluster.getStatus().getClusterInfo(), 
clusterHealthInfo);
+        testController.getStatusRecorder().patchAndCacheStatus(appCluster, 
kubernetesClient);
         testController.reconcile(appCluster, context);
         assertEquals(
                 JobManagerDeploymentStatus.DEPLOYING,
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java
index d7748fe0..9d58d71b 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java
@@ -22,6 +22,9 @@ import 
org.apache.flink.kubernetes.operator.health.ClusterHealthInfo;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.time.Clock;
 import java.time.Duration;
@@ -30,6 +33,7 @@ import java.time.ZoneId;
 import java.time.temporal.ChronoUnit;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.stream.Stream;
 
 import static java.time.Instant.ofEpochMilli;
 import static java.time.Instant.ofEpochSecond;
@@ -37,6 +41,9 @@ import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConf
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW;
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD;
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_WINDOW;
+import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
+import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT;
+import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.TOLERABLE_FAILURE_NUMBER;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -56,6 +63,7 @@ class ClusterHealthEvaluatorTest {
     @BeforeEach
     public void beforeEach() {
         configuration = new Configuration();
+        configuration.set(CHECKPOINTING_TIMEOUT, Duration.ofSeconds(30));
 
         clusterInfo = new HashMap<>();
 
@@ -266,17 +274,56 @@ class ClusterHealthEvaluatorTest {
         assertClusterHealthIs(true);
     }
 
-    @Test
-    public void 
evaluateShouldMarkClusterUnhealthyWhenNoCompletedCheckpointsOutsideWindow() {
+    private static Stream<Arguments> provideParametersEvaluateCheckpointing() {
+        Instant tenSecInstant = ofEpochSecond(10);
+        Instant twoMinInstant = ofEpochSecond(120);
+        Instant fourMinInstant = twoMinInstant.plus(2, ChronoUnit.MINUTES);
+        return Stream.of(
+                // 
ShouldMarkClusterUnhealthyWhenNoCompletedCheckpointsOutsideWindow
+                Arguments.of(twoMinInstant, fourMinInstant, 30L, 30L, null, 
false),
+                // 
ShouldMarkClusterHealthyWhenCompletedCheckpointsWithOutsideWindowFromCheckpointInterval
+                Arguments.of(twoMinInstant, fourMinInstant, 120L, 30L, null, 
true),
+                // 
ShouldMarkClusterUnhealthyWhenNoCompletedCheckpointsWithOutsideWindowFromCheckpointInterval
+                Arguments.of(tenSecInstant, fourMinInstant, 120L, 30L, null, 
false),
+                // 
ShouldMarkClusterHealthyWhenCompletedCheckpointsWithOutsideWindowFromCheckpointIntervalTimesNbTolerableFailure
+                Arguments.of(twoMinInstant, fourMinInstant, 30L, 10L, 3, true),
+                // 
ShouldMarkClusterHealthyWhenNoCompletedCheckpointsWithOutsideWindowFromCheckpointIntervalTimesNbTolerableFailure
+                Arguments.of(tenSecInstant, fourMinInstant, 30L, 10L, 3, 
false),
+                // 
ShouldMarkClusterHealthyWhenCompletedCheckpointsWithOutsideWindowFromCheckpointingTimeout
+                Arguments.of(twoMinInstant, fourMinInstant, 30L, 120L, null, 
true),
+                // 
ShouldMarkClusterHealthyWhenNoCompletedCheckpointsWithOutsideWindowFromCheckpointingTimeout
+                Arguments.of(tenSecInstant, fourMinInstant, 30L, 120L, null, 
false),
+                // 
ShouldMarkClusterHealthyWhenCompletedCheckpointsWithOutsideWindowFromCheckpointingTimeoutTimesNbTolerableFailure
+                Arguments.of(twoMinInstant, fourMinInstant, 10L, 30L, 3, true),
+                // 
ShouldMarkClusterHealthyWhenNoCompletedCheckpointsWithOutsideWindowFromCheckpointingTimeoutTimesNbTolerableFailure
+                Arguments.of(tenSecInstant, fourMinInstant, 10L, 30L, 3, 
false));
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideParametersEvaluateCheckpointing")
+    public void evaluateCheckpointing(
+            Instant validInstant1,
+            Instant validInstant2,
+            Long checkpointingInterval,
+            long checkpointingTimeout,
+            Integer tolerationFailureNumber,
+            boolean expectedIsHealthy) {
         
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED, 
true);
         configuration.set(
                 OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW, 
Duration.ofMinutes(1));
+        if (checkpointingInterval != null) {
+            configuration.set(CHECKPOINTING_INTERVAL, 
Duration.ofSeconds(checkpointingInterval));
+        }
+        configuration.set(CHECKPOINTING_TIMEOUT, 
Duration.ofSeconds(checkpointingTimeout));
+        if (tolerationFailureNumber != null) {
+            configuration.set(TOLERABLE_FAILURE_NUMBER, 
tolerationFailureNumber);
+        }
         var observedClusterHealthInfo1 = 
createClusterHealthInfo(validInstant1, 0, 0);
         var observedClusterHealthInfo2 = 
createClusterHealthInfo(validInstant2, 0, 0);
 
         setLastValidClusterHealthInfo(observedClusterHealthInfo1);
         clusterHealthEvaluator.evaluate(configuration, clusterInfo, 
observedClusterHealthInfo2);
-        assertClusterHealthIs(false);
+        assertClusterHealthIs(expectedIsHealthy);
     }
 
     private ClusterHealthInfo createClusterHealthInfo(

Reply via email to