This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new 07f43a5c683 [FLINK-31974] Do not crash jobmanager on Kubernetes client 
errors
07f43a5c683 is described below

commit 07f43a5c68301ce119352d40cbec46b2c52a79a2
Author: Gyula Fora <[email protected]>
AuthorDate: Wed May 31 16:25:35 2023 +0200

    [FLINK-31974] Do not crash jobmanager on Kubernetes client errors
---
 .../KubernetesResourceManagerDriver.java           | 12 ++++++++--
 .../KubernetesResourceManagerDriverTest.java       | 27 ++++++++++++++++++++++
 2 files changed, 37 insertions(+), 2 deletions(-)

diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
index 827fddce6c2..6450838f8cc 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
@@ -53,6 +53,8 @@ import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.FutureUtils;
 
+import io.fabric8.kubernetes.client.KubernetesClientException;
+
 import javax.annotation.Nullable;
 
 import java.io.File;
@@ -64,6 +66,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 
 /** Implementation of {@link ResourceManagerDriver} for Kubernetes deployment. 
*/
 public class KubernetesResourceManagerDriver
@@ -219,6 +222,10 @@ public class KubernetesResourceManagerDriver
                             if (t == null) {
                                 return null;
                             }
+                            // Unwrap CompletionException cause if any
+                            if (t instanceof CompletionException && 
t.getCause() != null) {
+                                t = t.getCause();
+                            }
                             if (t instanceof CancellationException) {
 
                                 
requestResourceFutures.remove(taskManagerPod.getName());
@@ -228,8 +235,9 @@ public class KubernetesResourceManagerDriver
                                             podName);
                                     stopPod(taskManagerPod.getName());
                                 }
-                            } else if (t instanceof RetryableException) {
-                                // ignore
+                            } else if (t instanceof RetryableException
+                                    || t instanceof KubernetesClientException) 
{
+                                // ignore transient / retriable errors
                             } else {
                                 log.error("Error completing resource 
request.", t);
                                 ExceptionUtils.rethrow(t);
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriverTest.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriverTest.java
index 1a959c13274..52e27f0814d 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriverTest.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriverTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.util.concurrent.FutureUtils;
 
 import io.fabric8.kubernetes.api.model.ResourceRequirements;
+import io.fabric8.kubernetes.client.KubernetesClientException;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
@@ -44,10 +45,12 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.assertj.core.api.Assertions.fail;
 
@@ -223,6 +226,30 @@ class KubernetesResourceManagerDriverTest
         };
     }
 
+    @Test
+    void testKubernetesExceptionHandling() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () ->
+                                assertThatCode(
+                                                () ->
+                                                        runInMainThread(
+                                                                        () -> {
+                                                                            
getDriver()
+                                                                               
     .requestResource(
+                                                                               
             TASK_EXECUTOR_PROCESS_SPEC)
+                                                                               
     .completeExceptionally(
+                                                                               
             new CompletionException(
+                                                                               
                     new KubernetesClientException(
+                                                                               
                             "test")));
+                                                                        })
+                                                                .get())
+                                        .doesNotThrowAnyException());
+            }
+        };
+    }
+
     @Test
     void testRecoverPreviousAttemptWorkersPodTerminated() throws Exception {
         new Context() {

Reply via email to