This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new fc1b901c [FLINK-30781] Add completed checkpoint check to cluster
health check
fc1b901c is described below
commit fc1b901c42fcd838f12254e2b47dd471f5eef8b9
Author: Gabor Somogyi <[email protected]>
AuthorDate: Tue Jan 24 17:59:23 2023 +0100
[FLINK-30781] Add completed checkpoint check to cluster health check
---
.../content/docs/custom-resource/job-management.md | 5 +-
.../shortcodes/generated/dynamic_section.html | 12 ++
.../kubernetes_operator_config_configuration.html | 12 ++
.../config/KubernetesOperatorConfigOptions.java | 18 ++
.../operator/health/ClusterHealthInfo.java | 20 +-
.../operator/observer/ClusterHealthEvaluator.java | 170 +++++++++++-----
.../operator/observer/ClusterHealthObserver.java | 21 +-
.../deployment/AbstractJobReconciler.java | 4 +-
.../deployment/ApplicationReconciler.java | 9 +-
.../controller/UnhealthyDeploymentRestartTest.java | 48 +++++
.../operator/health/ClusterHealthInfoTest.java | 22 +-
.../observer/ClusterHealthEvaluatorTest.java | 222 +++++++++++++++------
.../deployment/ApplicationReconcilerTest.java | 5 +-
13 files changed, 435 insertions(+), 133 deletions(-)
diff --git a/docs/content/docs/custom-resource/job-management.md
b/docs/content/docs/custom-resource/job-management.md
index 54013478..da0fc4aa 100644
--- a/docs/content/docs/custom-resource/job-management.md
+++ b/docs/content/docs/custom-resource/job-management.md
@@ -238,8 +238,11 @@ When Kubernetes HA is enabled, the operator can restart
the Flink cluster deploy
unhealthy. Unhealthy deployment restart can be turned on in the configuration
by setting `kubernetes.operator.cluster.health-check.enabled` to `true`
(default: `false`).
In order this feature to work one must enable [recovery of missing job
deployments](#recovery-of-missing-job-deployments).
-At the moment deployment is considered unhealthy when Flink's restarts count
reaches `kubernetes.operator.cluster.health-check.restarts.threshold` (default:
`64`)
+At the moment deployment is considered unhealthy when:
+* Flink's restarts count reaches
`kubernetes.operator.cluster.health-check.restarts.threshold` (default: `64`)
within time window of
`kubernetes.operator.cluster.health-check.restarts.window` (default: 2 minutes).
+* `cluster.health-check.checkpoint-progress.enabled` is turned on and Flink's
successful checkpoints count is not
+changing within time window of within time window of
`kubernetes.operator.cluster.health-check.checkpoint-progress.window` (default:
5 minutes).
## Restart failed job deployments
diff --git a/docs/layouts/shortcodes/generated/dynamic_section.html
b/docs/layouts/shortcodes/generated/dynamic_section.html
index 51e1b313..707024b6 100644
--- a/docs/layouts/shortcodes/generated/dynamic_section.html
+++ b/docs/layouts/shortcodes/generated/dynamic_section.html
@@ -8,6 +8,18 @@
</tr>
</thead>
<tbody>
+ <tr>
+
<td><h5>kubernetes.operator.cluster.health-check.checkpoint-progress.enabled</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to enable checkpoint progress health check for
clusters.</td>
+ </tr>
+ <tr>
+
<td><h5>kubernetes.operator.cluster.health-check.checkpoint-progress.window</h5></td>
+ <td style="word-wrap: break-word;">5 min</td>
+ <td>Duration</td>
+ <td>If no checkpoints are completed within the defined time
window, the job is considered unhealthy. This must be bigger than checkpointing
interval.</td>
+ </tr>
<tr>
<td><h5>kubernetes.operator.cluster.health-check.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index 5e69a439..69be9ba9 100644
---
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -8,6 +8,18 @@
</tr>
</thead>
<tbody>
+ <tr>
+
<td><h5>kubernetes.operator.cluster.health-check.checkpoint-progress.enabled</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to enable checkpoint progress health check for
clusters.</td>
+ </tr>
+ <tr>
+
<td><h5>kubernetes.operator.cluster.health-check.checkpoint-progress.window</h5></td>
+ <td style="word-wrap: break-word;">5 min</td>
+ <td>Duration</td>
+ <td>If no checkpoints are completed within the defined time
window, the job is considered unhealthy. This must be bigger than checkpointing
interval.</td>
+ </tr>
<tr>
<td><h5>kubernetes.operator.cluster.health-check.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index ea769568..ea1c6f18 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -379,6 +379,24 @@ public class KubernetesOperatorConfigOptions {
"The threshold which is checked against job
restart count within a configured window. "
+ "If the restart count is reaching the
threshold then full cluster restart is initiated.");
+ @Documentation.Section(SECTION_DYNAMIC)
+ public static final ConfigOption<Boolean>
+ OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED =
+
operatorConfig("cluster.health-check.checkpoint-progress.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to enable checkpoint progress
health check for clusters.");
+
+ @Documentation.Section(SECTION_DYNAMIC)
+ public static final ConfigOption<Duration>
+ OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW =
+
operatorConfig("cluster.health-check.checkpoint-progress.window")
+ .durationType()
+ .defaultValue(Duration.ofMinutes(5))
+ .withDescription(
+ "If no checkpoints are completed within
the defined time window, the job is considered unhealthy. This must be bigger
than checkpointing interval.");
+
@Documentation.Section(SECTION_DYNAMIC)
public static final ConfigOption<Boolean> OPERATOR_JOB_RESTART_FAILED =
operatorConfig("job.restart.failed")
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/ClusterHealthInfo.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/ClusterHealthInfo.java
index bb76f862..8bfd1dec 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/ClusterHealthInfo.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/ClusterHealthInfo.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.AllArgsConstructor;
import lombok.Data;
-import lombok.NoArgsConstructor;
import java.time.Clock;
@@ -31,7 +30,6 @@ import java.time.Clock;
@Experimental
@Data
@AllArgsConstructor
-@NoArgsConstructor
public class ClusterHealthInfo {
/** Millisecond timestamp of the last observed health information. */
private long timeStamp;
@@ -39,15 +37,25 @@ public class ClusterHealthInfo {
/** Number of restarts. */
private int numRestarts;
+ /** Millisecond timestamp lastly evaluated the number of restarts. */
+ private long numRestartsEvaluationTimeStamp;
+
+ /** Number of successfully completed checkpoints. */
+ private int numCompletedCheckpoints;
+
+ /** Millisecond timestamp lastly increased the number of completed
checkpoints. */
+ private long numCompletedCheckpointsIncreasedTimeStamp;
+
/** Calculated field whether the cluster is healthy or not. */
private boolean healthy;
- public static ClusterHealthInfo of(int numRestarts) {
- return of(Clock.systemDefaultZone(), numRestarts);
+ public ClusterHealthInfo() {
+ this(Clock.systemDefaultZone());
}
- public static ClusterHealthInfo of(Clock clock, int numRestarts) {
- return new ClusterHealthInfo(clock.millis(), numRestarts, true);
+ public ClusterHealthInfo(Clock clock) {
+ timeStamp = clock.millis();
+ healthy = true;
}
public static boolean isValid(ClusterHealthInfo clusterHealthInfo) {
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java
index 4f5f3dc6..6a39586f 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java
@@ -27,6 +27,8 @@ import java.time.Clock;
import java.time.Duration;
import java.util.Map;
+import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED;
+import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW;
import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD;
import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_WINDOW;
@@ -75,6 +77,10 @@ public class ClusterHealthEvaluator {
var lastValidClusterHealthInfo =
getLastValidClusterHealthInfo(clusterInfo);
if (lastValidClusterHealthInfo == null) {
LOG.debug("No last valid health info, skipping health check");
+ observedClusterHealthInfo.setNumRestartsEvaluationTimeStamp(
+ observedClusterHealthInfo.getTimeStamp());
+
observedClusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(
+ observedClusterHealthInfo.getTimeStamp());
setLastValidClusterHealthInfo(clusterInfo,
observedClusterHealthInfo);
} else if (observedClusterHealthInfo.getTimeStamp()
< lastValidClusterHealthInfo.getTimeStamp()) {
@@ -82,65 +88,127 @@ public class ClusterHealthEvaluator {
"Observed health info timestamp is less than the last
valid health info timestamp, this indicates a bug...";
LOG.error(msg);
throw new IllegalStateException(msg);
- } else if (observedClusterHealthInfo.getNumRestarts()
- < lastValidClusterHealthInfo.getNumRestarts()) {
- LOG.debug(
- "Observed health info number of restarts is less than
the last valid health info number of restarts, skipping health check");
- setLastValidClusterHealthInfo(clusterInfo,
observedClusterHealthInfo);
} else {
- boolean isHealthy = true;
-
LOG.debug("Valid health info exist, checking cluster health");
LOG.debug("Last valid health info: {}",
lastValidClusterHealthInfo);
LOG.debug("Observed health info: {}",
observedClusterHealthInfo);
- var timestampDiffMs =
- observedClusterHealthInfo.getTimeStamp()
- - lastValidClusterHealthInfo.getTimeStamp();
- LOG.debug(
- "Time difference between health infos: {}",
- Duration.ofMillis(timestampDiffMs));
-
- var restartCheckWindow =
-
configuration.get(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_WINDOW);
- var restartCheckWindowMs = restartCheckWindow.toMillis();
- double countMultiplier = (double) restartCheckWindowMs /
(double) timestampDiffMs;
- // If the 2 health info timestamp difference is within the
window then no
- // scaling needed
- if (countMultiplier > 1) {
- countMultiplier = 1;
- }
- long numRestarts =
- (long)
- ((double)
-
(observedClusterHealthInfo.getNumRestarts()
- -
lastValidClusterHealthInfo
-
.getNumRestarts())
- * countMultiplier);
- LOG.debug(
- "Calculated restart count for {} window: {}",
- restartCheckWindow,
- numRestarts);
-
- if (lastValidClusterHealthInfo.getTimeStamp()
- < clock.millis() - restartCheckWindowMs) {
- LOG.debug("Last valid health info timestamp is outside of
the window");
- setLastValidClusterHealthInfo(clusterInfo,
observedClusterHealthInfo);
- }
-
- var restartThreshold =
-
configuration.get(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD);
- if (numRestarts > restartThreshold) {
- LOG.info("Restart count hit threshold: {}",
restartThreshold);
- setLastValidClusterHealthInfo(clusterInfo,
observedClusterHealthInfo);
- isHealthy = false;
- }
-
- // Update the health flag
- lastValidClusterHealthInfo =
getLastValidClusterHealthInfo(clusterInfo);
+ boolean isHealthy =
+ evaluateRestarts(
+ configuration,
+ clusterInfo,
+ lastValidClusterHealthInfo,
+ observedClusterHealthInfo)
+ && evaluateCheckpoints(
+ configuration,
+ lastValidClusterHealthInfo,
+ observedClusterHealthInfo);
+
+
lastValidClusterHealthInfo.setTimeStamp(observedClusterHealthInfo.getTimeStamp());
lastValidClusterHealthInfo.setHealthy(isHealthy);
setLastValidClusterHealthInfo(clusterInfo,
lastValidClusterHealthInfo);
}
}
}
+
+ private boolean evaluateRestarts(
+ Configuration configuration,
+ Map<String, String> clusterInfo,
+ ClusterHealthInfo lastValidClusterHealthInfo,
+ ClusterHealthInfo observedClusterHealthInfo) {
+
+ if (observedClusterHealthInfo.getNumRestarts()
+ < lastValidClusterHealthInfo.getNumRestarts()) {
+ LOG.debug(
+ "Observed health info number of restarts is less than in
the last valid health info, skipping health check");
+
lastValidClusterHealthInfo.setNumRestarts(observedClusterHealthInfo.getNumRestarts());
+ lastValidClusterHealthInfo.setNumRestartsEvaluationTimeStamp(
+ observedClusterHealthInfo.getTimeStamp());
+ return true;
+ }
+
+ var timestampDiffMs =
+ observedClusterHealthInfo.getTimeStamp()
+ -
lastValidClusterHealthInfo.getNumRestartsEvaluationTimeStamp();
+ LOG.debug("Time difference between health infos: {}",
Duration.ofMillis(timestampDiffMs));
+
+ var restartCheckWindow =
configuration.get(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_WINDOW);
+ var restartCheckWindowMs = restartCheckWindow.toMillis();
+ double countMultiplier = (double) restartCheckWindowMs / (double)
timestampDiffMs;
+ // If the 2 health info timestamp difference is within the window then
no
+ // scaling needed
+ if (countMultiplier > 1) {
+ countMultiplier = 1;
+ }
+ long numRestarts =
+ (long)
+ ((double)
+
(observedClusterHealthInfo.getNumRestarts()
+ -
lastValidClusterHealthInfo.getNumRestarts())
+ * countMultiplier);
+ LOG.debug("Calculated restart count for {} window: {}",
restartCheckWindow, numRestarts);
+
+ var restartThreshold =
configuration.get(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD);
+ boolean isHealthy = numRestarts <= restartThreshold;
+ if (!isHealthy) {
+ LOG.info("Restart count hit threshold: {}", restartThreshold);
+ }
+
+ if (lastValidClusterHealthInfo.getNumRestartsEvaluationTimeStamp()
+ < clock.millis() - restartCheckWindowMs) {
+ LOG.debug(
+ "Last valid number of restarts evaluation timestamp is
outside of the window");
+
lastValidClusterHealthInfo.setNumRestarts(observedClusterHealthInfo.getNumRestarts());
+ lastValidClusterHealthInfo.setNumRestartsEvaluationTimeStamp(
+ observedClusterHealthInfo.getTimeStamp());
+ }
+
+ return isHealthy;
+ }
+
+ private boolean evaluateCheckpoints(
+ Configuration configuration,
+ ClusterHealthInfo lastValidClusterHealthInfo,
+ ClusterHealthInfo observedClusterHealthInfo) {
+ if
(!configuration.getBoolean(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED))
{
+ return true;
+ }
+
+ if (observedClusterHealthInfo.getNumCompletedCheckpoints()
+ < lastValidClusterHealthInfo.getNumCompletedCheckpoints()) {
+ LOG.debug(
+ "Observed health info number of completed checkpoints is
less than in the last valid health info, skipping health check");
+ lastValidClusterHealthInfo.setNumCompletedCheckpoints(
+ observedClusterHealthInfo.getNumCompletedCheckpoints());
+
lastValidClusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(
+ observedClusterHealthInfo.getTimeStamp());
+ return true;
+ }
+
+ var timestampDiffMs =
+ observedClusterHealthInfo.getTimeStamp()
+ -
lastValidClusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp();
+ LOG.debug("Time difference between health infos: {}",
Duration.ofMillis(timestampDiffMs));
+
+ boolean isHealthy = true;
+ var completedCheckpointsCheckWindow =
+
configuration.get(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW);
+ var completedCheckpointsCheckWindowMs =
completedCheckpointsCheckWindow.toMillis();
+
+ if (observedClusterHealthInfo.getNumCompletedCheckpoints()
+ > lastValidClusterHealthInfo.getNumCompletedCheckpoints()) {
+ LOG.debug("Last valid number of completed checkpoints increased
marking timestamp");
+ lastValidClusterHealthInfo.setNumCompletedCheckpoints(
+ observedClusterHealthInfo.getNumCompletedCheckpoints());
+
lastValidClusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(
+ observedClusterHealthInfo.getTimeStamp());
+ } else if
(lastValidClusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp()
+ + completedCheckpointsCheckWindowMs
+ < clock.millis()) {
+ LOG.info("Cluster is not able to complete checkpoints");
+ isHealthy = false;
+ }
+
+ return isHealthy;
+ }
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthObserver.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthObserver.java
index 6f63cfd2..0dae9353 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthObserver.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthObserver.java
@@ -33,6 +33,8 @@ public class ClusterHealthObserver {
private static final Logger LOG =
LoggerFactory.getLogger(ClusterHealthObserver.class);
private static final String FULL_RESTARTS_METRIC_NAME = "fullRestarts";
private static final String NUM_RESTARTS_METRIC_NAME = "numRestarts";
+ private static final String NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME =
+ "numberOfCompletedCheckpoints";
private final ClusterHealthEvaluator clusterHealthEvaluator;
public ClusterHealthObserver() {
@@ -56,22 +58,23 @@ public class ClusterHealthObserver {
.getMetrics(
ctx.getObserveConfig(),
jobId,
- List.of(FULL_RESTARTS_METRIC_NAME,
NUM_RESTARTS_METRIC_NAME));
- ClusterHealthInfo observedClusterHealthInfo;
+ List.of(
+ FULL_RESTARTS_METRIC_NAME,
+ NUM_RESTARTS_METRIC_NAME,
+
NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME));
+ ClusterHealthInfo observedClusterHealthInfo = new
ClusterHealthInfo();
if (metrics.containsKey(NUM_RESTARTS_METRIC_NAME)) {
LOG.debug(NUM_RESTARTS_METRIC_NAME + " metric is used");
- observedClusterHealthInfo =
- ClusterHealthInfo.of(
-
Integer.parseInt(metrics.get(NUM_RESTARTS_METRIC_NAME)));
+ observedClusterHealthInfo.setNumRestarts(
+
Integer.parseInt(metrics.get(NUM_RESTARTS_METRIC_NAME)));
} else if (metrics.containsKey(FULL_RESTARTS_METRIC_NAME)) {
LOG.debug(
FULL_RESTARTS_METRIC_NAME
+ " metric is used because "
+ NUM_RESTARTS_METRIC_NAME
+ " is missing");
- observedClusterHealthInfo =
- ClusterHealthInfo.of(
-
Integer.parseInt(metrics.get(FULL_RESTARTS_METRIC_NAME)));
+ observedClusterHealthInfo.setNumRestarts(
+
Integer.parseInt(metrics.get(FULL_RESTARTS_METRIC_NAME)));
} else {
throw new IllegalStateException(
"No job restart metric found. Either "
@@ -80,6 +83,8 @@ public class ClusterHealthObserver {
+ NUM_RESTARTS_METRIC_NAME
+ "(new) must exist.");
}
+ observedClusterHealthInfo.setNumCompletedCheckpoints(
+
Integer.parseInt(metrics.get(NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME)));
LOG.debug("Observed cluster health: {}",
observedClusterHealthInfo);
clusterHealthEvaluator.evaluate(
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index e9c170ca..b9e2fa6d 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -249,7 +249,9 @@ public abstract class AbstractJobReconciler<
throws Exception {
LOG.info("Resubmitting Flink job...");
SPEC specToRecover =
ReconciliationUtils.getDeployedSpec(ctx.getResource());
- specToRecover.getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+ if (requireHaMetadata) {
+ specToRecover.getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+ }
restoreJob(ctx, specToRecover, ctx.getObserveConfig(),
requireHaMetadata);
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 13317e65..4db3302e 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -145,6 +145,10 @@ public class ApplicationReconciler
var relatedResource = ctx.getResource();
var status = relatedResource.getStatus();
var flinkService = ctx.getFlinkService();
+
+ ClusterHealthEvaluator.removeLastValidClusterHealthInfo(
+ relatedResource.getStatus().getClusterInfo());
+
if (savepoint.isPresent()) {
deployConfig.set(SavepointConfigOptions.SAVEPOINT_PATH,
savepoint.get());
} else {
@@ -271,7 +275,10 @@ public class ApplicationReconciler
MSG_RESTART_UNHEALTHY);
cleanupAfterFailedJob(ctx);
}
- resubmitJob(ctx, true);
+ boolean requireHaMetadata =
+
ReconciliationUtils.getDeployedSpec(ctx.getResource()).getJob().getUpgradeMode()
+ != UpgradeMode.STATELESS;
+ resubmitJob(ctx, requireHaMetadata);
return true;
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
index e5814a0a..e643c0d2 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
@@ -33,7 +33,12 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
+import java.time.Duration;
+
+import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED;
+import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW;
import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED;
+import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD;
import static org.junit.jupiter.api.Assertions.assertEquals;
/** @link Unhealthy deployment restart tests */
@@ -42,6 +47,9 @@ public class UnhealthyDeploymentRestartTest {
private static final String NUM_RESTARTS_METRIC_NAME = "numRestarts";
+ private static final String NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME =
+ "numberOfCompletedCheckpoints";
+
private FlinkConfigManager configManager;
private TestingFlinkService flinkService;
@@ -54,12 +62,18 @@ public class UnhealthyDeploymentRestartTest {
public void setup() {
var configuration = new Configuration();
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED, true);
+ configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD,
64);
+
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED,
true);
+
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW,
Duration.ZERO);
configManager = new FlinkConfigManager(configuration);
flinkService = new TestingFlinkService(kubernetesClient);
context = flinkService.getContext();
testController =
new TestingFlinkDeploymentController(configManager,
kubernetesClient, flinkService);
kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace();
+
+ flinkService.setMetricValue(NUM_RESTARTS_METRIC_NAME, "0");
+
flinkService.setMetricValue(NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME, "1");
}
@ParameterizedTest
@@ -95,4 +109,38 @@ public class UnhealthyDeploymentRestartTest {
appCluster.getStatus().getJobManagerDeploymentStatus());
assertEquals("RUNNING",
appCluster.getStatus().getJobStatus().getState());
}
+
+ @ParameterizedTest
+
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersionsAndUpgradeModes")
+ public void verifyApplicationNoCompletedCheckpointsJmRecovery(
+ FlinkVersion flinkVersion, UpgradeMode upgradeMode) throws
Exception {
+ FlinkDeployment appCluster =
TestUtils.buildApplicationCluster(flinkVersion);
+ appCluster.getSpec().getJob().setUpgradeMode(upgradeMode);
+
+ // Start a healthy deployment
+
flinkService.setMetricValue(NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME, "1");
+ testController.reconcile(appCluster, context);
+ testController.reconcile(appCluster, context);
+ testController.reconcile(appCluster, context);
+ assertEquals(
+ JobManagerDeploymentStatus.READY,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+ assertEquals("RUNNING",
appCluster.getStatus().getJobStatus().getState());
+
+ // Make deployment unhealthy
+
flinkService.setMetricValue(NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME, "1");
+ testController.reconcile(appCluster, context);
+ assertEquals(
+ JobManagerDeploymentStatus.DEPLOYING,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+
+ // After restart the deployment is healthy again
+
flinkService.setMetricValue(NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME, "2");
+ testController.reconcile(appCluster, context);
+ testController.reconcile(appCluster, context);
+ assertEquals(
+ JobManagerDeploymentStatus.READY,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+ assertEquals("RUNNING",
appCluster.getStatus().getJobStatus().getState());
+ }
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/ClusterHealthInfoTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/ClusterHealthInfoTest.java
index 2b6707c0..77f8c009 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/ClusterHealthInfoTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/ClusterHealthInfoTest.java
@@ -32,19 +32,33 @@ class ClusterHealthInfoTest {
@Test
public void isValidShouldReturnFalseWhenTimestampIsZero() {
var clock = Clock.fixed(ofEpochSecond(0), ZoneId.systemDefault());
- assertFalse(ClusterHealthInfo.isValid(ClusterHealthInfo.of(clock, 0)));
+ assertFalse(ClusterHealthInfo.isValid(new ClusterHealthInfo(clock)));
}
@Test
public void isValidShouldReturnTrueWhenTimestampIsNonzero() {
var clock = Clock.fixed(ofEpochSecond(1), ZoneId.systemDefault());
- assertTrue(ClusterHealthInfo.isValid(ClusterHealthInfo.of(clock, 0)));
+ assertTrue(ClusterHealthInfo.isValid(new ClusterHealthInfo(clock)));
+ }
+
+ @Test
+ public void deserializeWithOldVersionShouldDeserializeCorrectly() {
+ var clusterHealthInfoJson =
"{\"timeStamp\":1,\"numRestarts\":2,\"healthy\":true}";
+ var clusterHealthInfoFromJson =
ClusterHealthInfo.deserialize(clusterHealthInfoJson);
+ assertEquals(1, clusterHealthInfoFromJson.getTimeStamp());
+ assertEquals(2, clusterHealthInfoFromJson.getNumRestarts());
+ assertTrue(clusterHealthInfoFromJson.isHealthy());
}
@Test
public void serializationRoundTrip() {
- var clock = Clock.fixed(ofEpochSecond(123), ZoneId.systemDefault());
- var clusterHealthInfo = ClusterHealthInfo.of(clock, 456);
+ var clock = Clock.fixed(ofEpochSecond(1), ZoneId.systemDefault());
+ var clusterHealthInfo = new ClusterHealthInfo(clock);
+ clusterHealthInfo.setNumRestarts(2);
+ clusterHealthInfo.setNumRestartsEvaluationTimeStamp(3);
+ clusterHealthInfo.setNumCompletedCheckpoints(4);
+ clusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(5);
+ clusterHealthInfo.setHealthy(false);
var clusterHealthInfoJson =
ClusterHealthInfo.serialize(clusterHealthInfo);
var clusterHealthInfoFromJson =
ClusterHealthInfo.deserialize(clusterHealthInfoJson);
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java
index e6cf9a63..d7748fe0 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java
@@ -33,6 +33,8 @@ import java.util.Map;
import static java.time.Instant.ofEpochMilli;
import static java.time.Instant.ofEpochSecond;
+import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED;
+import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW;
import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD;
import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_WINDOW;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -47,7 +49,8 @@ class ClusterHealthEvaluatorTest {
private Map<String, String> clusterInfo;
private ClusterHealthEvaluator clusterHealthEvaluator;
private final Instant invalidInstant = ofEpochMilli(0);
- private final Instant validInstant = ofEpochMilli(1);
+ private final Instant validInstant1 = ofEpochSecond(120);
+ private final Instant validInstant2 = validInstant1.plus(2,
ChronoUnit.MINUTES);
private ClusterHealthInfo invalidClusterHealthInfo;
@BeforeEach
@@ -56,11 +59,11 @@ class ClusterHealthEvaluatorTest {
clusterInfo = new HashMap<>();
- var now = Clock.fixed(ofEpochSecond(120), ZoneId.systemDefault());
- clusterHealthEvaluator = new ClusterHealthEvaluator(now);
-
var clock = Clock.fixed(invalidInstant, ZoneId.systemDefault());
- invalidClusterHealthInfo = ClusterHealthInfo.of(clock, 0);
+ invalidClusterHealthInfo = new ClusterHealthInfo(clock);
+
+ var now = Clock.fixed(validInstant2, ZoneId.systemDefault());
+ clusterHealthEvaluator = new ClusterHealthEvaluator(now);
}
@Test
@@ -71,18 +74,14 @@ class ClusterHealthEvaluatorTest {
@Test
public void evaluateShouldSetLastStateWhenValidObserved() {
- var observedClusterHealthInfo = createClusterHealthInfo(validInstant,
0);
-
- clusterHealthEvaluator.evaluate(configuration, clusterInfo,
observedClusterHealthInfo);
- assertEquals(
- observedClusterHealthInfo,
-
ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo));
+ var observedClusterHealthInfo = createClusterHealthInfo(validInstant1,
0, 1);
+ setLastValidClusterHealthInfo(observedClusterHealthInfo);
}
@Test
public void evaluateShouldThrowExceptionWhenObservedTimestampIsOld() {
- var observedClusterHealthInfo1 = createClusterHealthInfo(validInstant,
0);
- var observedClusterHealthInfo2 =
createClusterHealthInfo(validInstant.plusMillis(100), 0);
+ var observedClusterHealthInfo1 =
createClusterHealthInfo(validInstant1, 0, 1);
+ var observedClusterHealthInfo2 =
createClusterHealthInfo(validInstant2, 0, 1);
ClusterHealthEvaluator.setLastValidClusterHealthInfo(
clusterInfo, observedClusterHealthInfo2);
@@ -94,103 +93,206 @@ class ClusterHealthEvaluatorTest {
}
@Test
- public void evaluateShouldOverwriteLastStateWhenRestartCountIsLess() {
- var observedClusterHealthInfo1 = createClusterHealthInfo(validInstant,
1);
- var observedClusterHealthInfo2 = createClusterHealthInfo(validInstant,
0);
+ public void evaluateShouldOverwriteRestartCountWhenLess() {
+ var observedClusterHealthInfo1 =
createClusterHealthInfo(validInstant1, 1, 1);
+ var observedClusterHealthInfo2 =
createClusterHealthInfo(validInstant2, 0, 1);
- clusterHealthEvaluator.evaluate(configuration, clusterInfo,
observedClusterHealthInfo1);
- assertEquals(
- observedClusterHealthInfo1,
-
ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo));
+ setLastValidClusterHealthInfo(observedClusterHealthInfo1);
clusterHealthEvaluator.evaluate(configuration, clusterInfo,
observedClusterHealthInfo2);
+ var lastValidClusterHealthInfo =
+
ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo);
+ assertNotNull(lastValidClusterHealthInfo);
assertEquals(
- observedClusterHealthInfo2,
-
ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo));
+ observedClusterHealthInfo2.getNumRestarts(),
+ lastValidClusterHealthInfo.getNumRestarts());
+ assertEquals(
+ observedClusterHealthInfo2.getTimeStamp(),
+
lastValidClusterHealthInfo.getNumRestartsEvaluationTimeStamp());
}
@Test
- public void evaluateShouldNotOverwriteLastStateWhenTimestampIsInWindow() {
+ public void
evaluateShouldNotOverwriteRestartCountWhenTimestampIsInWindow() {
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_WINDOW,
Duration.ofMinutes(2));
- var observedClusterHealthInfo1 = createClusterHealthInfo(validInstant,
0);
- var observedClusterHealthInfo2 =
- createClusterHealthInfo(validInstant.plus(1,
ChronoUnit.MINUTES), 0);
+ var observedClusterHealthInfo1 =
createClusterHealthInfo(validInstant1, 0, 1);
+ var observedClusterHealthInfo2 =
createClusterHealthInfo(validInstant2, 1, 1);
- ClusterHealthEvaluator.setLastValidClusterHealthInfo(
- clusterInfo, observedClusterHealthInfo1);
+ setLastValidClusterHealthInfo(observedClusterHealthInfo1);
clusterHealthEvaluator.evaluate(configuration, clusterInfo,
observedClusterHealthInfo2);
+ var lastValidClusterHealthInfo =
+
ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo);
+ assertNotNull(lastValidClusterHealthInfo);
assertEquals(
- observedClusterHealthInfo1,
-
ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo));
+ observedClusterHealthInfo1.getNumRestarts(),
+ lastValidClusterHealthInfo.getNumRestarts());
+ assertEquals(
+ observedClusterHealthInfo1.getTimeStamp(),
+
lastValidClusterHealthInfo.getNumRestartsEvaluationTimeStamp());
}
@Test
- public void evaluateShouldOverwriteLastStateWhenTimestampIsOutOfWindow() {
+ public void
evaluateShouldOverwriteRestartCountWhenTimestampIsOutOfWindow() {
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_WINDOW,
Duration.ofMinutes(1));
- var observedClusterHealthInfo1 = createClusterHealthInfo(validInstant,
0);
- var observedClusterHealthInfo2 =
- createClusterHealthInfo(validInstant.plus(1,
ChronoUnit.MINUTES), 0);
+ var observedClusterHealthInfo1 =
createClusterHealthInfo(validInstant1, 0, 1);
+ var observedClusterHealthInfo2 =
createClusterHealthInfo(validInstant2, 1, 1);
- ClusterHealthEvaluator.setLastValidClusterHealthInfo(
- clusterInfo, observedClusterHealthInfo1);
+ setLastValidClusterHealthInfo(observedClusterHealthInfo1);
clusterHealthEvaluator.evaluate(configuration, clusterInfo,
observedClusterHealthInfo2);
+ var lastValidClusterHealthInfo =
+
ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo);
+ assertNotNull(lastValidClusterHealthInfo);
assertEquals(
- observedClusterHealthInfo2,
-
ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo));
+ observedClusterHealthInfo2.getNumRestarts(),
+ lastValidClusterHealthInfo.getNumRestarts());
+ assertEquals(
+ observedClusterHealthInfo2.getTimeStamp(),
+
lastValidClusterHealthInfo.getNumRestartsEvaluationTimeStamp());
+ }
+
+ @Test
+ public void evaluateShouldOverwriteCompletedCheckpointCountWhenLess() {
+
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED,
true);
+ var observedClusterHealthInfo1 =
createClusterHealthInfo(validInstant1, 0, 1);
+ var observedClusterHealthInfo2 =
createClusterHealthInfo(validInstant2, 0, 0);
+
+ setLastValidClusterHealthInfo(observedClusterHealthInfo1);
+ clusterHealthEvaluator.evaluate(configuration, clusterInfo,
observedClusterHealthInfo2);
+ var lastValidClusterHealthInfo =
+
ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo);
+ assertNotNull(lastValidClusterHealthInfo);
+ assertEquals(
+ observedClusterHealthInfo2.getNumCompletedCheckpoints(),
+ lastValidClusterHealthInfo.getNumCompletedCheckpoints());
+ assertEquals(
+ observedClusterHealthInfo2.getTimeStamp(),
+
lastValidClusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp());
+ }
+
+ @Test
+ public void evaluateShouldOverwriteCompletedCheckpointWhenIncreased() {
+
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED,
true);
+ configuration.set(
+ OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW,
Duration.ofMinutes(2));
+ var observedClusterHealthInfo1 =
createClusterHealthInfo(validInstant1, 0, 1);
+ var observedClusterHealthInfo2 =
createClusterHealthInfo(validInstant2, 0, 2);
+
+ setLastValidClusterHealthInfo(observedClusterHealthInfo1);
+ clusterHealthEvaluator.evaluate(configuration, clusterInfo,
observedClusterHealthInfo2);
+ var lastValidClusterHealthInfo =
+
ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo);
+ assertNotNull(lastValidClusterHealthInfo);
+ assertEquals(
+ observedClusterHealthInfo2.getNumCompletedCheckpoints(),
+ lastValidClusterHealthInfo.getNumCompletedCheckpoints());
+ assertEquals(
+ observedClusterHealthInfo2.getTimeStamp(),
+
lastValidClusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp());
+ }
+
+ @Test
+ public void
evaluateShouldNotOverwriteCompletedCheckpointWhenNotIncreased() {
+
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED,
true);
+ configuration.set(
+ OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW,
Duration.ofMinutes(2));
+ var observedClusterHealthInfo1 =
createClusterHealthInfo(validInstant1, 0, 1);
+ var observedClusterHealthInfo2 =
createClusterHealthInfo(validInstant2, 0, 1);
+
+ setLastValidClusterHealthInfo(observedClusterHealthInfo1);
+ clusterHealthEvaluator.evaluate(configuration, clusterInfo,
observedClusterHealthInfo2);
+ var lastValidClusterHealthInfo =
+
ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo);
+ assertNotNull(lastValidClusterHealthInfo);
+ assertEquals(
+ observedClusterHealthInfo1.getNumCompletedCheckpoints(),
+ lastValidClusterHealthInfo.getNumCompletedCheckpoints());
+ assertEquals(
+ observedClusterHealthInfo1.getTimeStamp(),
+
lastValidClusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp());
}
@Test
public void evaluateShouldMarkClusterHealthyWhenNoPreviousState() {
- var observedClusterHealthInfo = createClusterHealthInfo(validInstant,
1);
+ var observedClusterHealthInfo = createClusterHealthInfo(validInstant1,
1, 1);
clusterHealthEvaluator.evaluate(configuration, clusterInfo,
observedClusterHealthInfo);
assertClusterHealthIs(true);
}
@Test
- public void evaluateShouldMarkClusterHealthyWhenThresholdNotHit() {
+ public void evaluateShouldMarkClusterHealthyWhenRestartThresholdNotHit() {
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_WINDOW,
Duration.ofMinutes(5));
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD,
100);
- ClusterHealthInfo observedClusterHealthInfo1 =
createClusterHealthInfo(validInstant, 0);
- var observedClusterHealthInfo2 =
- createClusterHealthInfo(validInstant.plus(1,
ChronoUnit.MINUTES), 100);
+ ClusterHealthInfo observedClusterHealthInfo1 =
createClusterHealthInfo(validInstant1, 0, 1);
+ var observedClusterHealthInfo2 =
createClusterHealthInfo(validInstant2, 100, 1);
- ClusterHealthEvaluator.setLastValidClusterHealthInfo(
- clusterInfo, observedClusterHealthInfo1);
+ setLastValidClusterHealthInfo(observedClusterHealthInfo1);
clusterHealthEvaluator.evaluate(configuration, clusterInfo,
observedClusterHealthInfo2);
assertClusterHealthIs(true);
}
@Test
- public void
evaluateShouldMarkClusterUnhealthyWhenThresholdHitImmediately() {
+ public void
evaluateShouldMarkClusterUnhealthyWhenRestartThresholdHitImmediately() {
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_WINDOW,
Duration.ofMinutes(5));
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD,
100);
- ClusterHealthInfo observedClusterHealthInfo1 =
createClusterHealthInfo(validInstant, 0);
- var observedClusterHealthInfo2 =
- createClusterHealthInfo(validInstant.plus(1,
ChronoUnit.MINUTES), 101);
+ ClusterHealthInfo observedClusterHealthInfo1 =
createClusterHealthInfo(validInstant1, 0, 1);
+ var observedClusterHealthInfo2 =
createClusterHealthInfo(validInstant2, 101, 1);
- ClusterHealthEvaluator.setLastValidClusterHealthInfo(
- clusterInfo, observedClusterHealthInfo1);
+ setLastValidClusterHealthInfo(observedClusterHealthInfo1);
clusterHealthEvaluator.evaluate(configuration, clusterInfo,
observedClusterHealthInfo2);
assertClusterHealthIs(false);
}
@Test
- public void evaluateShouldMarkClusterUnhealthyWhenThresholdHitInAverage() {
- configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_WINDOW,
Duration.ofMinutes(5));
+ public void
evaluateShouldMarkClusterUnhealthyWhenRestartThresholdHitInAverage() {
+ configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_WINDOW,
Duration.ofMinutes(1));
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD,
100);
- var observedClusterHealthInfo1 = createClusterHealthInfo(validInstant,
0);
- var observedClusterHealthInfo2 =
- createClusterHealthInfo(validInstant.plus(6,
ChronoUnit.MINUTES), 122);
+ var observedClusterHealthInfo1 =
createClusterHealthInfo(validInstant1, 0, 1);
+ var observedClusterHealthInfo2 =
createClusterHealthInfo(validInstant2, 500, 1);
- ClusterHealthEvaluator.setLastValidClusterHealthInfo(
- clusterInfo, observedClusterHealthInfo1);
+ setLastValidClusterHealthInfo(observedClusterHealthInfo1);
+ clusterHealthEvaluator.evaluate(configuration, clusterInfo,
observedClusterHealthInfo2);
+ assertClusterHealthIs(false);
+ }
+
+ @Test
+ public void
evaluateShouldMarkClusterHealthyWhenNoCompletedCheckpointsInsideWindow() {
+
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED,
true);
+ configuration.set(
+ OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW,
Duration.ofMinutes(3));
+ var observedClusterHealthInfo1 =
createClusterHealthInfo(validInstant1, 0, 0);
+ var observedClusterHealthInfo2 =
createClusterHealthInfo(validInstant2, 0, 0);
+
+ setLastValidClusterHealthInfo(observedClusterHealthInfo1);
+ clusterHealthEvaluator.evaluate(configuration, clusterInfo,
observedClusterHealthInfo2);
+ assertClusterHealthIs(true);
+ }
+
+ @Test
+ public void
evaluateShouldMarkClusterUnhealthyWhenNoCompletedCheckpointsOutsideWindow() {
+
configuration.set(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED,
true);
+ configuration.set(
+ OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW,
Duration.ofMinutes(1));
+ var observedClusterHealthInfo1 =
createClusterHealthInfo(validInstant1, 0, 0);
+ var observedClusterHealthInfo2 =
createClusterHealthInfo(validInstant2, 0, 0);
+
+ setLastValidClusterHealthInfo(observedClusterHealthInfo1);
clusterHealthEvaluator.evaluate(configuration, clusterInfo,
observedClusterHealthInfo2);
assertClusterHealthIs(false);
}
- private ClusterHealthInfo createClusterHealthInfo(Instant instant, int
numRestarts) {
+ private ClusterHealthInfo createClusterHealthInfo(
+ Instant instant, int numRestarts, int numCompletedCheckpoints) {
var clock = Clock.fixed(instant, ZoneId.systemDefault());
- return ClusterHealthInfo.of(clock, numRestarts);
+ var clusterHealthInfo = new ClusterHealthInfo(clock);
+ clusterHealthInfo.setNumRestarts(numRestarts);
+ clusterHealthInfo.setNumCompletedCheckpoints(numCompletedCheckpoints);
+ return clusterHealthInfo;
+ }
+
+ private void setLastValidClusterHealthInfo(ClusterHealthInfo
clusterHealthInfo) {
+ clusterHealthEvaluator.evaluate(configuration, clusterInfo,
clusterHealthInfo);
+ assertEquals(
+ clusterHealthInfo,
+
ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo));
}
private void assertClusterHealthIs(boolean healthy) {
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 4ca8618d..063986eb 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -691,7 +691,10 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
Assertions.assertEquals(MSG_SUBMIT,
eventCollector.events.remove().getMessage());
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
- var clusterHealthInfo = new
ClusterHealthInfo(System.currentTimeMillis(), 2, false);
+ var clusterHealthInfo = new ClusterHealthInfo();
+ clusterHealthInfo.setTimeStamp(System.currentTimeMillis());
+ clusterHealthInfo.setNumRestarts(2);
+ clusterHealthInfo.setHealthy(false);
ClusterHealthEvaluator.setLastValidClusterHealthInfo(
deployment.getStatus().getClusterInfo(), clusterHealthInfo);
reconciler.reconcile(deployment, context);