This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push:
new caaa476 [FLINK-20365][python] The native k8s cluster could not be
unregistered when executing Python DataStream jobs in attach mode
caaa476 is described below
commit caaa476a89549959f61e9e1b81694b1a7a4bef57
Author: acqua.csq <[email protected]>
AuthorDate: Thu Nov 26 17:28:31 2020 +0800
[FLINK-20365][python] The native k8s cluster could not be unregistered when
executing Python DataStream jobs in attach mode
This closes 14232.
---
flink-python/pyflink/util/exceptions.py | 3 +++
.../java/org/apache/flink/client/python/PythonDriver.java | 13 ++++++++-----
.../java/org/apache/flink/client/python/PythonEnvUtils.java | 10 ++++++++++
3 files changed, 21 insertions(+), 5 deletions(-)
diff --git a/flink-python/pyflink/util/exceptions.py
b/flink-python/pyflink/util/exceptions.py
index fa5a69b..c173780 100644
--- a/flink-python/pyflink/util/exceptions.py
+++ b/flink-python/pyflink/util/exceptions.py
@@ -146,6 +146,9 @@ def capture_java_exception(f):
try:
return f(*a, **kw)
except Py4JJavaError as e:
+ from pyflink.java_gateway import get_gateway
+ get_gateway().jvm.org.apache.flink.client.python.PythonEnvUtils\
+ .setPythonException(e.java_exception)
s = e.java_exception.toString()
stack_trace = '\n\t at '.join(map(lambda x: x.toString(),
e.java_exception.getStackTrace()))
diff --git
a/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java
b/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java
index fc8b906..9eaa701 100644
---
a/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java
+++
b/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java
@@ -33,7 +33,6 @@ import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
-import java.util.concurrent.ExecutionException;
/**
* A main class used to launch Python applications. It executes python as a
@@ -42,7 +41,7 @@ import java.util.concurrent.ExecutionException;
public final class PythonDriver {
private static final Logger LOG =
LoggerFactory.getLogger(PythonDriver.class);
- public static void main(String[] args) throws ExecutionException,
InterruptedException {
+ public static void main(String[] args) throws Throwable {
// The python job needs at least 2 args.
// e.g. py a.py [user args]
// e.g. pym a.b [user args]
@@ -106,9 +105,13 @@ public final class PythonDriver {
} catch (Throwable e) {
LOG.error("Run python process failed", e);
- // throw ProgramAbortException if the caller is
interested in the program plan,
- // there is no harm to throw ProgramAbortException even
if it is not the case.
- throw new ProgramAbortException();
+ if (PythonEnvUtils.capturedJavaException != null) {
+ throw PythonEnvUtils.capturedJavaException;
+ } else {
+ // throw ProgramAbortException if the caller is
interested in the program plan,
+ // there is no harm to throw
ProgramAbortException even if it is not the case.
+ throw new ProgramAbortException();
+ }
} finally {
PythonEnvUtils.setGatewayServer(null);
gatewayServer.shutdown();
diff --git
a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
index 2c7a169..97bdb2d 100644
---
a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
+++
b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
@@ -18,9 +18,11 @@
package org.apache.flink.client.python;
+import
org.apache.flink.client.deployment.application.UnsuccessfulExecutionException;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.OperatingSystem;
@@ -73,6 +75,8 @@ final class PythonEnvUtils {
static final String PYFLINK_CLIENT_EXECUTABLE =
"PYFLINK_CLIENT_EXECUTABLE";
+ static volatile Throwable capturedJavaException = null;
+
/**
* Wraps Python exec environment.
*/
@@ -375,4 +379,10 @@ final class PythonEnvUtils {
// start the python process.
return PythonEnvUtils.startPythonProcess(pythonEnv, commands,
redirectToPipe);
}
+
+ public static void setPythonException(Throwable pythonException) {
+ if (ExceptionUtils.findThrowable(pythonException,
UnsuccessfulExecutionException.class).isPresent()) {
+ capturedJavaException = pythonException;
+ }
+ }
}