This is an automated email from the ASF dual-hosted git repository.
guoweijie pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push:
new 07f43a5c683 [FLINK-31974] Do not crash jobmanager on Kubernetes client
errors
07f43a5c683 is described below
commit 07f43a5c68301ce119352d40cbec46b2c52a79a2
Author: Gyula Fora <[email protected]>
AuthorDate: Wed May 31 16:25:35 2023 +0200
[FLINK-31974] Do not crash jobmanager on Kubernetes client errors
---
.../KubernetesResourceManagerDriver.java | 12 ++++++++--
.../KubernetesResourceManagerDriverTest.java | 27 ++++++++++++++++++++++
2 files changed, 37 insertions(+), 2 deletions(-)
diff --git
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
index 827fddce6c2..6450838f8cc 100644
---
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
+++
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
@@ -53,6 +53,8 @@ import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+
import javax.annotation.Nullable;
import java.io.File;
@@ -64,6 +66,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
/** Implementation of {@link ResourceManagerDriver} for Kubernetes deployment.
*/
public class KubernetesResourceManagerDriver
@@ -219,6 +222,10 @@ public class KubernetesResourceManagerDriver
if (t == null) {
return null;
}
+ // Unwrap CompletionException cause if any
+ if (t instanceof CompletionException &&
t.getCause() != null) {
+ t = t.getCause();
+ }
if (t instanceof CancellationException) {
requestResourceFutures.remove(taskManagerPod.getName());
@@ -228,8 +235,9 @@ public class KubernetesResourceManagerDriver
podName);
stopPod(taskManagerPod.getName());
}
- } else if (t instanceof RetryableException) {
- // ignore
+ } else if (t instanceof RetryableException
+ || t instanceof KubernetesClientException)
{
+ // ignore transient / retriable errors
} else {
log.error("Error completing resource
request.", t);
ExceptionUtils.rethrow(t);
diff --git
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriverTest.java
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriverTest.java
index 1a959c13274..52e27f0814d 100644
---
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriverTest.java
+++
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriverTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.util.concurrent.FutureUtils;
import io.fabric8.kubernetes.api.model.ResourceRequirements;
+import io.fabric8.kubernetes.client.KubernetesClientException;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
@@ -44,10 +45,12 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail;
@@ -223,6 +226,30 @@ class KubernetesResourceManagerDriverTest
};
}
+ @Test
+ void testKubernetesExceptionHandling() throws Exception {
+ new Context() {
+ {
+ runTest(
+ () ->
+ assertThatCode(
+ () ->
+ runInMainThread(
+ () -> {
+
getDriver()
+
.requestResource(
+
TASK_EXECUTOR_PROCESS_SPEC)
+
.completeExceptionally(
+
new CompletionException(
+
new KubernetesClientException(
+
"test")));
+ })
+ .get())
+ .doesNotThrowAnyException());
+ }
+ };
+ }
+
@Test
void testRecoverPreviousAttemptWorkersPodTerminated() throws Exception {
new Context() {