This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new caaa476  [FLINK-20365][python] The native k8s cluster could not be 
unregistered when executing Python DataStream jobs in attach mode
caaa476 is described below

commit caaa476a89549959f61e9e1b81694b1a7a4bef57
Author: acqua.csq <[email protected]>
AuthorDate: Thu Nov 26 17:28:31 2020 +0800

    [FLINK-20365][python] The native k8s cluster could not be unregistered when 
executing Python DataStream jobs in attach mode
    
    This closes 14232.
---
 flink-python/pyflink/util/exceptions.py                     |  3 +++
 .../java/org/apache/flink/client/python/PythonDriver.java   | 13 ++++++++-----
 .../java/org/apache/flink/client/python/PythonEnvUtils.java | 10 ++++++++++
 3 files changed, 21 insertions(+), 5 deletions(-)

diff --git a/flink-python/pyflink/util/exceptions.py 
b/flink-python/pyflink/util/exceptions.py
index fa5a69b..c173780 100644
--- a/flink-python/pyflink/util/exceptions.py
+++ b/flink-python/pyflink/util/exceptions.py
@@ -146,6 +146,9 @@ def capture_java_exception(f):
         try:
             return f(*a, **kw)
         except Py4JJavaError as e:
+            from pyflink.java_gateway import get_gateway
+            get_gateway().jvm.org.apache.flink.client.python.PythonEnvUtils\
+                .setPythonException(e.java_exception)
             s = e.java_exception.toString()
             stack_trace = '\n\t at '.join(map(lambda x: x.toString(),
                                               
e.java_exception.getStackTrace()))
diff --git 
a/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java 
b/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java
index fc8b906..9eaa701 100644
--- 
a/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java
+++ 
b/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java
@@ -33,7 +33,6 @@ import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.ExecutionException;
 
 /**
  * A main class used to launch Python applications. It executes python as a
@@ -42,7 +41,7 @@ import java.util.concurrent.ExecutionException;
 public final class PythonDriver {
        private static final Logger LOG = 
LoggerFactory.getLogger(PythonDriver.class);
 
-       public static void main(String[] args) throws ExecutionException, 
InterruptedException {
+       public static void main(String[] args) throws Throwable {
                // The python job needs at least 2 args.
                // e.g. py a.py [user args]
                // e.g. pym a.b [user args]
@@ -106,9 +105,13 @@ public final class PythonDriver {
                } catch (Throwable e) {
                        LOG.error("Run python process failed", e);
 
-                       // throw ProgramAbortException if the caller is 
interested in the program plan,
-                       // there is no harm to throw ProgramAbortException even 
if it is not the case.
-                       throw new ProgramAbortException();
+                       if (PythonEnvUtils.capturedJavaException != null) {
+                               throw PythonEnvUtils.capturedJavaException;
+                       } else {
+                               // throw ProgramAbortException if the caller is 
interested in the program plan,
+                               // there is no harm to throw 
ProgramAbortException even if it is not the case.
+                               throw new ProgramAbortException();
+                       }
                } finally {
                        PythonEnvUtils.setGatewayServer(null);
                        gatewayServer.shutdown();
diff --git 
a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java 
b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
index 2c7a169..97bdb2d 100644
--- 
a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
+++ 
b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.client.python;
 
+import 
org.apache.flink.client.deployment.application.UnsuccessfulExecutionException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.OperatingSystem;
@@ -73,6 +75,8 @@ final class PythonEnvUtils {
 
        static final String PYFLINK_CLIENT_EXECUTABLE = 
"PYFLINK_CLIENT_EXECUTABLE";
 
+       static volatile Throwable capturedJavaException = null;
+
        /**
         * Wraps Python exec environment.
         */
@@ -375,4 +379,10 @@ final class PythonEnvUtils {
                // start the python process.
                return PythonEnvUtils.startPythonProcess(pythonEnv, commands, 
redirectToPipe);
        }
+
+       public static void setPythonException(Throwable pythonException) {
+               if (ExceptionUtils.findThrowable(pythonException, 
UnsuccessfulExecutionException.class).isPresent()) {
+                       capturedJavaException = pythonException;
+               }
+       }
 }

Reply via email to