This is an automated email from the ASF dual-hosted git repository.
wangyang0918 pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 5452dcc [FLINK-24380][k8s] Terminate the pod if it failed
5452dcc is described below
commit 5452dccfb4e1e80a627647caea63aba3bda0fa79
Author: Yangze Guo <[email protected]>
AuthorDate: Mon Sep 27 15:51:04 2021 +0800
[FLINK-24380][k8s] Terminate the pod if it failed
This closes #17371.
---
.../kubeclient/resources/KubernetesPod.java | 31 ++++++++++++++-
.../kubeclient/resources/KubernetesPodTest.java | 45 ++++++++++++++++++++++
2 files changed, 74 insertions(+), 2 deletions(-)
diff --git
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPod.java
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPod.java
index 8952865..85eb535 100644
---
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPod.java
+++
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPod.java
@@ -18,6 +18,8 @@
package org.apache.flink.kubernetes.kubeclient.resources;
+import org.apache.flink.annotation.VisibleForTesting;
+
import io.fabric8.kubernetes.api.model.ContainerStateTerminated;
import io.fabric8.kubernetes.api.model.Pod;
@@ -37,8 +39,15 @@ public class KubernetesPod extends KubernetesResource<Pod> {
public boolean isTerminated() {
if (getInternalResource().getStatus() != null) {
- return
getInternalResource().getStatus().getContainerStatuses().stream()
- .anyMatch(e -> e.getState() != null &&
e.getState().getTerminated() != null);
+ final boolean podFailed =
+
PodPhase.Failed.name().equals(getInternalResource().getStatus().getPhase());
+ final boolean containersFailed =
+
getInternalResource().getStatus().getContainerStatuses().stream()
+ .anyMatch(
+ e ->
+ e.getState() != null
+ &&
e.getState().getTerminated() != null);
+ return containersFailed || podFailed;
}
return false;
}
@@ -79,6 +88,24 @@ public class KubernetesPod extends KubernetesResource<Pod> {
.collect(Collectors.joining(",")));
}
sb.append("]");
+ if
(PodPhase.Failed.name().equals(getInternalResource().getStatus().getPhase())) {
+ sb.append(
+ String.format(
+ ", pod status: %s(reason=%s, message=%s)",
+ getInternalResource().getStatus().getPhase(),
+ getInternalResource().getStatus().getReason(),
+ getInternalResource().getStatus().getMessage()));
+ }
return sb.toString();
}
+
+ /** The phase of a Pod, high-level summary of where the Pod is in its
lifecycle. */
+ @VisibleForTesting
+ enum PodPhase {
+ Pending,
+ Running,
+ Succeeded,
+ Failed,
+ Unknown
+ }
}
diff --git
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodTest.java
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodTest.java
new file mode 100644
index 0000000..0c4bdd2
--- /dev/null
+++
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import org.apache.flink.util.TestLogger;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.PodStatusBuilder;
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+/** Tests for {@link KubernetesPod}. */
+public class KubernetesPodTest extends TestLogger {
+
+ @Test
+ public void testIsTerminatedShouldReturnTrueWhenPodFailed() {
+ final Pod pod = new PodBuilder().build();
+ pod.setStatus(
+ new PodStatusBuilder()
+ .withPhase(KubernetesPod.PodPhase.Failed.name())
+ .withMessage("Pod Node didn't have enough resource")
+ .withReason("OutOfMemory")
+ .build());
+ assertThat(new KubernetesPod(pod).isTerminated(), is(true));
+ }
+}