This is an automated email from the ASF dual-hosted git repository.
morhidi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 899baa22 [FLINK-30157] Trigger Events Before JM Recovery and Unhealthy
Job Restarts
899baa22 is described below
commit 899baa22c21708179e8856396d816c168a970c0c
Author: Matyas Orhidi <[email protected]>
AuthorDate: Tue Nov 22 14:21:31 2022 -0800
[FLINK-30157] Trigger Events Before JM Recovery and Unhealthy Job Restarts
---
.../deployment/ApplicationReconciler.java | 17 ++++++++
.../kubernetes/operator/utils/EventRecorder.java | 4 +-
.../deployment/ApplicationReconcilerTest.java | 46 +++++++++++++++++++++-
3 files changed, 65 insertions(+), 2 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 2c39731c..4bb82c5d 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -61,6 +61,8 @@ public class ApplicationReconciler
extends AbstractJobReconciler<FlinkDeployment, FlinkDeploymentSpec,
FlinkDeploymentStatus> {
private static final Logger LOG =
LoggerFactory.getLogger(ApplicationReconciler.class);
+ static final String MSG_RECOVERY = "Recovering lost deployment";
+ static final String MSG_RESTART_UNHEALTHY = "Restarting unhealthy job";
protected final FlinkService flinkService;
public ApplicationReconciler(
@@ -275,7 +277,22 @@ public class ApplicationReconciler
shouldRestartJobBecauseUnhealthy(deployment, observeConfig);
boolean shouldRecoverDeployment =
shouldRecoverDeployment(observeConfig, deployment);
if (shouldRestartJobBecauseUnhealthy || shouldRecoverDeployment) {
+ if (shouldRecoverDeployment) {
+ eventRecorder.triggerEvent(
+ deployment,
+ EventRecorder.Type.Warning,
+ EventRecorder.Reason.RecoverDeployment,
+ EventRecorder.Component.Job,
+ MSG_RECOVERY);
+ }
+
if (shouldRestartJobBecauseUnhealthy) {
+ eventRecorder.triggerEvent(
+ deployment,
+ EventRecorder.Type.Warning,
+ EventRecorder.Reason.RestartUnhealthyJob,
+ EventRecorder.Component.Job,
+ MSG_RESTART_UNHEALTHY);
cleanupAfterFailedJob(deployment, ctx, observeConfig);
}
resubmitJob(deployment, ctx, observeConfig, true);
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
index a444577e..81baf9b6 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
@@ -126,6 +126,8 @@ public class EventRecorder {
Cleanup,
CleanupFailed,
Missing,
- ValidationError
+ ValidationError,
+ RecoverDeployment,
+ RestartUnhealthyJob
}
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 0eebdcd3..2d8d99b0 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -44,7 +44,10 @@ import
org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
+import org.apache.flink.kubernetes.operator.health.ClusterHealthInfo;
+import org.apache.flink.kubernetes.operator.observer.ClusterHealthEvaluator;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.EventCollector;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.SavepointStatus;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
@@ -72,6 +75,10 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
+import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED;
+import static
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.MSG_SUBMIT;
+import static
org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.MSG_RECOVERY;
+import static
org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.MSG_RESTART_UNHEALTHY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -90,10 +97,12 @@ public class ApplicationReconcilerTest {
private Context<FlinkDeployment> context;
private StatusRecorder<FlinkDeployment, FlinkDeploymentStatus>
statusRecorder;
+ private EventCollector eventCollector = new EventCollector();
+
@BeforeEach
public void before() {
kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace();
- var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
+ var eventRecorder = new EventRecorder(kubernetesClient,
eventCollector);
statusRecorder = new TestingStatusRecorder<FlinkDeployment,
FlinkDeploymentStatus>();
flinkService = new TestingFlinkService(kubernetesClient);
context = flinkService.getContext();
@@ -664,4 +673,39 @@ public class ApplicationReconcilerTest {
reconciler.reconcile(deployment, context);
assertEquals(JobManagerDeploymentStatus.MISSING,
status.getJobManagerDeploymentStatus());
}
+
+ @Test
+ public void testDeploymentRecoveryEvent() throws Exception {
+ FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+ reconciler.reconcile(deployment, context);
+ Assertions.assertEquals(MSG_SUBMIT,
eventCollector.events.remove().getMessage());
+ verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+
+ flinkService.clear();
+ FlinkDeploymentStatus deploymentStatus = deployment.getStatus();
+
deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+ deploymentStatus
+ .getJobStatus()
+
.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+ reconciler.reconcile(deployment, context);
+ Assertions.assertEquals(MSG_RECOVERY,
eventCollector.events.remove().getMessage());
+ }
+
+ @Test
+ public void testRestartUnhealthyEvent() throws Exception {
+ FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+ deployment
+ .getSpec()
+ .getFlinkConfiguration()
+ .put(OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED.key(), "true");
+ reconciler.reconcile(deployment, context);
+ Assertions.assertEquals(MSG_SUBMIT,
eventCollector.events.remove().getMessage());
+ verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+
+ var clusterHealthInfo = new
ClusterHealthInfo(System.currentTimeMillis(), 2, false);
+ ClusterHealthEvaluator.setLastValidClusterHealthInfo(
+ deployment.getStatus().getClusterInfo(), clusterHealthInfo);
+ reconciler.reconcile(deployment, context);
+ Assertions.assertEquals(MSG_RESTART_UNHEALTHY,
eventCollector.events.remove().getMessage());
+ }
}