This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new cb90b10c [FLINK-35126] Rework default checkpoint progress check window
cb90b10c is described below
commit cb90b10c93241f9994acdd11d59ce1ec25ccbc47
Author: Gyula Fora <[email protected]>
AuthorDate: Mon May 6 11:51:33 2024 +0200
[FLINK-35126] Rework default checkpoint progress check window
---
.../shortcodes/generated/dynamic_section.html | 6 +--
.../kubernetes_operator_config_configuration.html | 6 +--
.../config/KubernetesOperatorConfigOptions.java | 6 +--
.../operator/observer/ClusterHealthEvaluator.java | 50 ++++++++++++++--------
.../controller/UnhealthyDeploymentRestartTest.java | 2 +-
.../observer/ClusterHealthEvaluatorTest.java | 34 ++++++++++-----
6 files changed, 66 insertions(+), 38 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/dynamic_section.html
b/docs/layouts/shortcodes/generated/dynamic_section.html
index 9c1083fc..be963815 100644
--- a/docs/layouts/shortcodes/generated/dynamic_section.html
+++ b/docs/layouts/shortcodes/generated/dynamic_section.html
@@ -22,15 +22,15 @@
</tr>
<tr>
<td><h5>kubernetes.operator.cluster.health-check.checkpoint-progress.enabled</h5></td>
- <td style="word-wrap: break-word;">false</td>
+ <td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether to enable checkpoint progress health check for
clusters.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.cluster.health-check.checkpoint-progress.window</h5></td>
- <td style="word-wrap: break-word;">5 min</td>
+ <td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
- <td>If no checkpoints are completed within the defined time
window, the job is considered unhealthy. This must be bigger than checkpointing
interval.</td>
+ <td>If no checkpoints are completed within the defined time
window, the job is considered unhealthy. The minimum window size is
`max(checkpointingInterval, checkpointTimeout) * (tolerableCheckpointFailures +
2)`, which also serves as the default value when checkpointing is enabled. For
example with checkpoint interval 10 minutes and 0 tolerable failures, the
default progress check window will be 20 minutes.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.cluster.health-check.enabled</h5></td>
diff --git
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index 172029c8..218e2626 100644
---
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -22,15 +22,15 @@
</tr>
<tr>
<td><h5>kubernetes.operator.cluster.health-check.checkpoint-progress.enabled</h5></td>
- <td style="word-wrap: break-word;">false</td>
+ <td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether to enable checkpoint progress health check for
clusters.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.cluster.health-check.checkpoint-progress.window</h5></td>
- <td style="word-wrap: break-word;">5 min</td>
+ <td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
- <td>If no checkpoints are completed within the defined time
window, the job is considered unhealthy. This must be bigger than checkpointing
interval.</td>
+ <td>If no checkpoints are completed within the defined time
window, the job is considered unhealthy. The minimum window size is
`max(checkpointingInterval, checkpointTimeout) * (tolerableCheckpointFailures +
2)`, which also serves as the default value when checkpointing is enabled. For
example with checkpoint interval 10 minutes and 0 tolerable failures, the
default progress check window will be 20 minutes.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.cluster.health-check.enabled</h5></td>
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index b065d228..a9e6d322 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -502,7 +502,7 @@ public class KubernetesOperatorConfigOptions {
OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED =
operatorConfig("cluster.health-check.checkpoint-progress.enabled")
.booleanType()
- .defaultValue(false)
+ .defaultValue(true)
.withDescription(
"Whether to enable checkpoint progress
health check for clusters.");
@@ -511,9 +511,9 @@ public class KubernetesOperatorConfigOptions {
OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW =
operatorConfig("cluster.health-check.checkpoint-progress.window")
.durationType()
- .defaultValue(Duration.ofMinutes(5))
+ .noDefaultValue()
.withDescription(
- "If no checkpoints are completed within
the defined time window, the job is considered unhealthy. This must be bigger
than checkpointing interval.");
+ "If no checkpoints are completed within
the defined time window, the job is considered unhealthy. The minimum window
size is `max(checkpointingInterval, checkpointTimeout) *
(tolerableCheckpointFailures + 2)`, which also serves as the default value when
checkpointing is enabled. For example with checkpoint interval 10 minutes and 0
tolerable failures, the default progress check window will be 20 minutes.");
@Documentation.Section(SECTION_DYNAMIC)
public static final ConfigOption<Boolean> OPERATOR_JOB_RESTART_FAILED =
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java
index c84f3413..d38a8f0f 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java
@@ -178,30 +178,46 @@ public class ClusterHealthEvaluator {
return true;
}
- var completedCheckpointsCheckWindow =
-
configuration.get(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW);
+ var windowOpt =
+
configuration.getOptional(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW);
CheckpointConfig checkpointConfig = new CheckpointConfig();
checkpointConfig.configure(configuration);
var checkpointingInterval = checkpointConfig.getCheckpointInterval();
var checkpointingTimeout = checkpointConfig.getCheckpointTimeout();
- var tolerationFailureNumber =
checkpointConfig.getTolerableCheckpointFailureNumber() + 1;
- var minCompletedCheckpointsCheckWindow =
- Math.max(
- checkpointingInterval * tolerationFailureNumber,
- checkpointingTimeout * tolerationFailureNumber);
- if (completedCheckpointsCheckWindow.toMillis() <
minCompletedCheckpointsCheckWindow) {
- LOG.warn(
- "{} is not long enough. Default to max({} * {}, {} * {}):
{}ms",
-
OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW.key(),
- CHECKPOINTING_INTERVAL.key(),
- TOLERABLE_FAILURE_NUMBER.key(),
- CHECKPOINTING_TIMEOUT.key(),
- TOLERABLE_FAILURE_NUMBER.key(),
- minCompletedCheckpointsCheckWindow);
- completedCheckpointsCheckWindow =
Duration.ofMillis(minCompletedCheckpointsCheckWindow);
+ var tolerationFailureNumber =
checkpointConfig.getTolerableCheckpointFailureNumber() + 2;
+ var minCheckWindow =
+ Duration.ofMillis(
+ Math.max(
+ checkpointingInterval *
tolerationFailureNumber,
+ checkpointingTimeout *
tolerationFailureNumber));
+
+ if (windowOpt.isEmpty() && !checkpointConfig.isCheckpointingEnabled())
{
+ // If no explicit checkpoint check window is specified and
checkpointing is disabled
+ // based on the config, we don't do anything
+ return true;
}
+ var completedCheckpointsCheckWindow =
+ windowOpt
+ .filter(
+ d -> {
+ if (d.compareTo(minCheckWindow) < 0) {
+ LOG.debug(
+ "{} is not long enough.
Default to max({} * {}, {} * {}): {}",
+
OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW
+ .key(),
+ CHECKPOINTING_INTERVAL.key(),
+ TOLERABLE_FAILURE_NUMBER.key(),
+ CHECKPOINTING_TIMEOUT.key(),
+ TOLERABLE_FAILURE_NUMBER.key(),
+ minCheckWindow);
+ return false;
+ }
+ return true;
+ })
+ .orElse(minCheckWindow);
+
if (observedClusterHealthInfo.getNumCompletedCheckpoints()
< lastValidClusterHealthInfo.getNumCompletedCheckpoints()) {
LOG.debug(
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
index 050db7d4..c7b26d1b 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
@@ -138,7 +138,7 @@ public class UnhealthyDeploymentRestartTest {
// Ensure the last savepoint has been taken more than 10 minutes ago
(Default checkpoint
// interval)
clusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(
-
clusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp() - 600000);
+
clusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp() - 1200000);
setLastValidClusterHealthInfo(appCluster.getStatus().getClusterInfo(),
clusterHealthInfo);
testController.getStatusRecorder().patchAndCacheStatus(appCluster,
kubernetesClient);
testController.reconcile(appCluster, context);
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java
index 9d58d71b..83a0e751 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java
@@ -159,6 +159,8 @@ class ClusterHealthEvaluatorTest {
@Test
public void evaluateShouldOverwriteCompletedCheckpointCountWhenLess() {
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED,
true);
+ configuration.set(
+ OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW,
Duration.ofMinutes(5));
var observedClusterHealthInfo1 =
createClusterHealthInfo(validInstant1, 0, 1);
var observedClusterHealthInfo2 =
createClusterHealthInfo(validInstant2, 0, 0);
@@ -278,25 +280,33 @@ class ClusterHealthEvaluatorTest {
Instant tenSecInstant = ofEpochSecond(10);
Instant twoMinInstant = ofEpochSecond(120);
Instant fourMinInstant = twoMinInstant.plus(2, ChronoUnit.MINUTES);
+
+ Duration oneMin = Duration.ofMinutes(1);
return Stream.of(
//
ShouldMarkClusterUnhealthyWhenNoCompletedCheckpointsOutsideWindow
- Arguments.of(twoMinInstant, fourMinInstant, 30L, 30L, null,
false),
+ Arguments.of(twoMinInstant, fourMinInstant, oneMin, 30L, 30L,
null, false),
+ // Verify checkpoint progress even if checkpointing not
configured
+ Arguments.of(twoMinInstant, fourMinInstant, oneMin, null, 30L,
null, false),
+ // Verify default window if not explicitly configured
+ Arguments.of(twoMinInstant, fourMinInstant, null, 30L, 30L,
null, false),
+ // Verify check is off if both window and checkpointing is not
configured
+ Arguments.of(twoMinInstant, fourMinInstant, null, null, 30L,
null, true),
//
ShouldMarkClusterHealthyWhenCompletedCheckpointsWithOutsideWindowFromCheckpointInterval
- Arguments.of(twoMinInstant, fourMinInstant, 120L, 30L, null,
true),
+ Arguments.of(twoMinInstant, fourMinInstant, oneMin, 60L, 30L,
null, true),
//
ShouldMarkClusterUnhealthyWhenNoCompletedCheckpointsWithOutsideWindowFromCheckpointInterval
- Arguments.of(tenSecInstant, fourMinInstant, 120L, 30L, null,
false),
+ Arguments.of(tenSecInstant, fourMinInstant, oneMin, 60L, 30L,
null, false),
//
ShouldMarkClusterHealthyWhenCompletedCheckpointsWithOutsideWindowFromCheckpointIntervalTimesNbTolerableFailure
- Arguments.of(twoMinInstant, fourMinInstant, 30L, 10L, 3, true),
+ Arguments.of(twoMinInstant, fourMinInstant, oneMin, 30L, 10L,
3, true),
//
ShouldMarkClusterHealthyWhenNoCompletedCheckpointsWithOutsideWindowFromCheckpointIntervalTimesNbTolerableFailure
- Arguments.of(tenSecInstant, fourMinInstant, 30L, 10L, 3,
false),
+ Arguments.of(tenSecInstant, fourMinInstant, oneMin, 30L, 10L,
3, false),
//
ShouldMarkClusterHealthyWhenCompletedCheckpointsWithOutsideWindowFromCheckpointingTimeout
- Arguments.of(twoMinInstant, fourMinInstant, 30L, 120L, null,
true),
+ Arguments.of(twoMinInstant, fourMinInstant, oneMin, 30L, 60L,
null, true),
//
ShouldMarkClusterHealthyWhenNoCompletedCheckpointsWithOutsideWindowFromCheckpointingTimeout
- Arguments.of(tenSecInstant, fourMinInstant, 30L, 120L, null,
false),
+ Arguments.of(tenSecInstant, fourMinInstant, oneMin, 30L, 60L,
null, false),
//
ShouldMarkClusterHealthyWhenCompletedCheckpointsWithOutsideWindowFromCheckpointingTimeoutTimesNbTolerableFailure
- Arguments.of(twoMinInstant, fourMinInstant, 10L, 30L, 3, true),
+ Arguments.of(twoMinInstant, fourMinInstant, oneMin, 10L, 30L,
3, true),
//
ShouldMarkClusterHealthyWhenNoCompletedCheckpointsWithOutsideWindowFromCheckpointingTimeoutTimesNbTolerableFailure
- Arguments.of(tenSecInstant, fourMinInstant, 10L, 30L, 3,
false));
+ Arguments.of(tenSecInstant, fourMinInstant, oneMin, 10L, 30L,
3, false));
}
@ParameterizedTest
@@ -304,13 +314,15 @@ class ClusterHealthEvaluatorTest {
public void evaluateCheckpointing(
Instant validInstant1,
Instant validInstant2,
+ Duration window,
Long checkpointingInterval,
long checkpointingTimeout,
Integer tolerationFailureNumber,
boolean expectedIsHealthy) {
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED,
true);
- configuration.set(
- OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW,
Duration.ofMinutes(1));
+ if (window != null) {
+
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW,
window);
+ }
if (checkpointingInterval != null) {
configuration.set(CHECKPOINTING_INTERVAL,
Duration.ofSeconds(checkpointingInterval));
}