This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 775bc5f4 [FLINK-34131] Ensure
cluster.health-check.checkpoint-progress.window is well configure
775bc5f4 is described below
commit 775bc5f41df09accefca6590112509c6023b2fb4
Author: [email protected] <[email protected]>
AuthorDate: Thu Jan 18 09:41:47 2024 +0100
[FLINK-34131] Ensure cluster.health-check.checkpoint-progress.window is
well configure
cluster.health-check.checkpoint-progress.window must be greater than
max(execution.checkpointing.interval *
execution.checkpointing.tolerable-failed-checkpoints,
execution.checkpointing.timeout *
execution.checkpointing.tolerable-failed-checkpoints)
If it is below it can leads to unwanted restart of the flink cluster
---
.../operator/api/utils/BaseTestUtils.java | 4 +-
.../operator/observer/ClusterHealthEvaluator.java | 30 +++++++++++-
.../TestingFlinkDeploymentController.java | 2 +-
.../controller/UnhealthyDeploymentRestartTest.java | 11 +++++
.../observer/ClusterHealthEvaluatorTest.java | 53 ++++++++++++++++++++--
5 files changed, 93 insertions(+), 7 deletions(-)
diff --git
a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
index d3217a93..89724023 100644
---
a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
+++
b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
@@ -170,7 +170,9 @@ public class BaseTestUtils {
KubernetesHaServicesFactory.class.getCanonicalName());
conf.put(HighAvailabilityOptions.HA_STORAGE_PATH.key(), "test");
conf.put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
"test-savepoint-dir");
- conf.put(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key(),
"test-checkpoint-dir");
+ conf.put(
+ CheckpointingOptions.CHECKPOINTS_DIRECTORY.key(),
+ "file:///test/test-checkpoint-dir");
return FlinkDeploymentSpec.builder()
.image(IMAGE)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java
index 6a39586f..c84f3413 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.observer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.health.ClusterHealthInfo;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,6 +32,9 @@ import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConf
import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW;
import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD;
import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_WINDOW;
+import static
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
+import static
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT;
+import static
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.TOLERABLE_FAILURE_NUMBER;
/** Evaluates whether the cluster is healthy. */
public class ClusterHealthEvaluator {
@@ -174,6 +178,30 @@ public class ClusterHealthEvaluator {
return true;
}
+ var completedCheckpointsCheckWindow =
+
configuration.get(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW);
+
+ CheckpointConfig checkpointConfig = new CheckpointConfig();
+ checkpointConfig.configure(configuration);
+ var checkpointingInterval = checkpointConfig.getCheckpointInterval();
+ var checkpointingTimeout = checkpointConfig.getCheckpointTimeout();
+ var tolerationFailureNumber =
checkpointConfig.getTolerableCheckpointFailureNumber() + 1;
+ var minCompletedCheckpointsCheckWindow =
+ Math.max(
+ checkpointingInterval * tolerationFailureNumber,
+ checkpointingTimeout * tolerationFailureNumber);
+ if (completedCheckpointsCheckWindow.toMillis() <
minCompletedCheckpointsCheckWindow) {
+ LOG.warn(
+ "{} is not long enough. Default to max({} * {}, {} * {}):
{}ms",
+
OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW.key(),
+ CHECKPOINTING_INTERVAL.key(),
+ TOLERABLE_FAILURE_NUMBER.key(),
+ CHECKPOINTING_TIMEOUT.key(),
+ TOLERABLE_FAILURE_NUMBER.key(),
+ minCompletedCheckpointsCheckWindow);
+ completedCheckpointsCheckWindow =
Duration.ofMillis(minCompletedCheckpointsCheckWindow);
+ }
+
if (observedClusterHealthInfo.getNumCompletedCheckpoints()
< lastValidClusterHealthInfo.getNumCompletedCheckpoints()) {
LOG.debug(
@@ -191,8 +219,6 @@ public class ClusterHealthEvaluator {
LOG.debug("Time difference between health infos: {}",
Duration.ofMillis(timestampDiffMs));
boolean isHealthy = true;
- var completedCheckpointsCheckWindow =
-
configuration.get(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW);
var completedCheckpointsCheckWindowMs =
completedCheckpointsCheckWindow.toMillis();
if (observedClusterHealthInfo.getNumCompletedCheckpoints()
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
index 8056a0ee..8d435a61 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
@@ -75,7 +75,7 @@ public class TestingFlinkDeploymentController
@Getter private TestingFlinkResourceContextFactory contextFactory;
- private StatusRecorder<FlinkDeployment, FlinkDeploymentStatus>
statusRecorder;
+ @Getter private StatusRecorder<FlinkDeployment, FlinkDeploymentStatus>
statusRecorder;
@Getter private CanaryResourceManager<FlinkDeployment>
canaryResourceManager;
private Map<ResourceID, Tuple2<FlinkDeploymentSpec, Long>>
currentGenerations = new HashMap<>();
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
index 5cdb9781..050db7d4 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
@@ -25,6 +25,7 @@ import
org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.health.ClusterHealthInfo;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
@@ -39,6 +40,8 @@ import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConf
import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW;
import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED;
import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD;
+import static
org.apache.flink.kubernetes.operator.observer.ClusterHealthEvaluator.getLastValidClusterHealthInfo;
+import static
org.apache.flink.kubernetes.operator.observer.ClusterHealthEvaluator.setLastValidClusterHealthInfo;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
@@ -130,6 +133,14 @@ public class UnhealthyDeploymentRestartTest {
// Make deployment unhealthy
flinkService.setMetricValue(NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME, "1");
+ ClusterHealthInfo clusterHealthInfo =
+
getLastValidClusterHealthInfo(appCluster.getStatus().getClusterInfo());
+ // Ensure the last savepoint has been taken more than 10 minutes ago
(Default checkpoint
+ // interval)
+ clusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(
+
clusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp() - 600000);
+ setLastValidClusterHealthInfo(appCluster.getStatus().getClusterInfo(),
clusterHealthInfo);
+ testController.getStatusRecorder().patchAndCacheStatus(appCluster,
kubernetesClient);
testController.reconcile(appCluster, context);
assertEquals(
JobManagerDeploymentStatus.DEPLOYING,
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java
index d7748fe0..9d58d71b 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java
@@ -22,6 +22,9 @@ import
org.apache.flink.kubernetes.operator.health.ClusterHealthInfo;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.time.Clock;
import java.time.Duration;
@@ -30,6 +33,7 @@ import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.Map;
+import java.util.stream.Stream;
import static java.time.Instant.ofEpochMilli;
import static java.time.Instant.ofEpochSecond;
@@ -37,6 +41,9 @@ import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConf
import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW;
import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD;
import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_WINDOW;
+import static
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
+import static
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT;
+import static
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.TOLERABLE_FAILURE_NUMBER;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -56,6 +63,7 @@ class ClusterHealthEvaluatorTest {
@BeforeEach
public void beforeEach() {
configuration = new Configuration();
+ configuration.set(CHECKPOINTING_TIMEOUT, Duration.ofSeconds(30));
clusterInfo = new HashMap<>();
@@ -266,17 +274,56 @@ class ClusterHealthEvaluatorTest {
assertClusterHealthIs(true);
}
- @Test
- public void
evaluateShouldMarkClusterUnhealthyWhenNoCompletedCheckpointsOutsideWindow() {
+ private static Stream<Arguments> provideParametersEvaluateCheckpointing() {
+ Instant tenSecInstant = ofEpochSecond(10);
+ Instant twoMinInstant = ofEpochSecond(120);
+ Instant fourMinInstant = twoMinInstant.plus(2, ChronoUnit.MINUTES);
+ return Stream.of(
+ //
ShouldMarkClusterUnhealthyWhenNoCompletedCheckpointsOutsideWindow
+ Arguments.of(twoMinInstant, fourMinInstant, 30L, 30L, null,
false),
+ //
ShouldMarkClusterHealthyWhenCompletedCheckpointsWithOutsideWindowFromCheckpointInterval
+ Arguments.of(twoMinInstant, fourMinInstant, 120L, 30L, null,
true),
+ //
ShouldMarkClusterUnhealthyWhenNoCompletedCheckpointsWithOutsideWindowFromCheckpointInterval
+ Arguments.of(tenSecInstant, fourMinInstant, 120L, 30L, null,
false),
+ //
ShouldMarkClusterHealthyWhenCompletedCheckpointsWithOutsideWindowFromCheckpointIntervalTimesNbTolerableFailure
+ Arguments.of(twoMinInstant, fourMinInstant, 30L, 10L, 3, true),
+ //
ShouldMarkClusterHealthyWhenNoCompletedCheckpointsWithOutsideWindowFromCheckpointIntervalTimesNbTolerableFailure
+ Arguments.of(tenSecInstant, fourMinInstant, 30L, 10L, 3,
false),
+ //
ShouldMarkClusterHealthyWhenCompletedCheckpointsWithOutsideWindowFromCheckpointingTimeout
+ Arguments.of(twoMinInstant, fourMinInstant, 30L, 120L, null,
true),
+ //
ShouldMarkClusterHealthyWhenNoCompletedCheckpointsWithOutsideWindowFromCheckpointingTimeout
+ Arguments.of(tenSecInstant, fourMinInstant, 30L, 120L, null,
false),
+ //
ShouldMarkClusterHealthyWhenCompletedCheckpointsWithOutsideWindowFromCheckpointingTimeoutTimesNbTolerableFailure
+ Arguments.of(twoMinInstant, fourMinInstant, 10L, 30L, 3, true),
+ //
ShouldMarkClusterHealthyWhenNoCompletedCheckpointsWithOutsideWindowFromCheckpointingTimeoutTimesNbTolerableFailure
+ Arguments.of(tenSecInstant, fourMinInstant, 10L, 30L, 3,
false));
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideParametersEvaluateCheckpointing")
+ public void evaluateCheckpointing(
+ Instant validInstant1,
+ Instant validInstant2,
+ Long checkpointingInterval,
+ long checkpointingTimeout,
+ Integer tolerationFailureNumber,
+ boolean expectedIsHealthy) {
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED,
true);
configuration.set(
OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW,
Duration.ofMinutes(1));
+ if (checkpointingInterval != null) {
+ configuration.set(CHECKPOINTING_INTERVAL,
Duration.ofSeconds(checkpointingInterval));
+ }
+ configuration.set(CHECKPOINTING_TIMEOUT,
Duration.ofSeconds(checkpointingTimeout));
+ if (tolerationFailureNumber != null) {
+ configuration.set(TOLERABLE_FAILURE_NUMBER,
tolerationFailureNumber);
+ }
var observedClusterHealthInfo1 =
createClusterHealthInfo(validInstant1, 0, 0);
var observedClusterHealthInfo2 =
createClusterHealthInfo(validInstant2, 0, 0);
setLastValidClusterHealthInfo(observedClusterHealthInfo1);
clusterHealthEvaluator.evaluate(configuration, clusterInfo,
observedClusterHealthInfo2);
- assertClusterHealthIs(false);
+ assertClusterHealthIs(expectedIsHealthy);
}
private ClusterHealthInfo createClusterHealthInfo(