This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 1f60be8  [FLINK-26727][python] Fix the implementation of 
sub-interpreter in Thread Mode
1f60be8 is described below

commit 1f60be8a39b4130de7e0e588d856623858d7b57e
Author: huangxingbo <[email protected]>
AuthorDate: Fri Mar 18 18:12:40 2022 +0800

    [FLINK-26727][python] Fix the implementation of sub-interpreter in Thread 
Mode
    
    This closes #19150.
---
 .../docs/dev/python/python_execution_mode.md       | 57 ++++++--------
 .../docs/dev/python/python_execution_mode.md       | 57 ++++++--------
 .../shortcodes/generated/python_configuration.html |  2 +-
 .../pyflink/table/tests/test_dependency.py         | 15 +---
 flink-python/pyflink/table/tests/test_udf.py       | 15 +---
 .../org/apache/flink/python/PythonOptions.java     |  9 +--
 .../embedded/EmbeddedPythonEnvironmentManager.java | 16 +---
 .../AbstractEmbeddedPythonFunctionOperator.java    |  8 +-
 .../EmbeddedPythonScalarFunctionOperator.java      | 90 ++++------------------
 9 files changed, 75 insertions(+), 194 deletions(-)

diff --git a/docs/content.zh/docs/dev/python/python_execution_mode.md 
b/docs/content.zh/docs/dev/python/python_execution_mode.md
index c34da24..4bb3eb1 100644
--- a/docs/content.zh/docs/dev/python/python_execution_mode.md
+++ b/docs/content.zh/docs/dev/python/python_execution_mode.md
@@ -31,33 +31,24 @@ defines how to execute your customized Python functions.
 Prior to release-1.15, there is the only execution mode called `PROCESS` 
execution mode. The `PROCESS`
 mode means that the Python user-defined functions will be executed in separate 
Python processes.
 
-In release-1.15, it has introduced another two execution modes called 
`MULTI-THREAD` execution mode and
-`SUB-INTERPRETER` execution mode. The `MULTI-THREAD` mode means that the 
Python user-defined functions
-will be executed in the same thread as Java Operator, but it will be affected 
by GIL performance.
-The `SUB-INTERPRETER` mode means that the Python user-defined functions will 
be executed in Python
-different sub-interpreters rather than different threads of one interpreter, 
which can largely overcome
-the effects of the GIL, but some CPython extensions libraries doesn't support 
it, such as numpy, tensorflow, etc.
+In release-1.15, it has introduced a new execution mode called `THREAD` 
execution mode. The `THREAD`
+mode means that the Python user-defined functions will be executed in the same 
process as Java Operator,
+It should be noted that multiple Python user-defined functions running in the 
same JVM are still affected by GIL.
 
-## When can/should I use MULTI-THREAD execution mode or SUB-INTERPRETER 
execution mode?
+## When can/should I use THREAD execution mode?
 
-The purpose of the introduction of `MULTI-THREAD` mode and `SUB-INTERPRETER` 
mode is to overcome the
-overhead of serialization/deserialization and network communication caused in 
`PROCESS` mode.
-So if performance is not your concern, or the computing logic of your 
customized Python functions is
-the performance bottleneck of the job, `PROCESS` mode will be the best choice 
as `PROCESS` mode provides
-the best isolation compared to `MULTI-THREAD` mode and `SUB-INTERPRETER` mode.
-
-Compared to `MULTI-THREAD` execution mode, `SUB-INTERPRETER` execution mode 
can largely overcome the
-effects of the GIL, so you can get better performance usually. However, 
`SUB-INTERPRETER` may fail in some CPython
-extensions libraries, such as numpy, tensorflow. In this case, you should use 
`PROCESS` mode or `MULTI-THREAD` mode.
+The purpose of the introduction of `THREAD` mode is to overcome the overhead 
of serialization/deserialization
+and network communication caused in `PROCESS` mode. So if performance is not 
your concern, or the computing
+logic of your customized Python functions is the performance bottleneck of the 
job, `PROCESS` mode will
+be the best choice as `PROCESS` mode provides the best isolation compared to 
`THREAD` mode.
 
 ## Configuring Python execution mode
 
 The execution mode can be configured via the `python.execution-mode` setting.
-There are three possible values:
+There are two possible values:
 
  - `PROCESS`: The Python user-defined functions will be executed in separate 
Python process. (default)
- - `MULTI-THREAD`: The Python user-defined functions will be executed in the 
same thread as Java Operator.
- - `SUB-INTERPRETER`: The Python user-defined functions will be executed in 
Python different sub-interpreters.
+ - `THREAD`: The Python user-defined functions will be executed in the same 
process as Java operator.
 
 You could specify the Python execution mode using Python Table API as 
following:
 
@@ -65,27 +56,23 @@ You could specify the Python execution mode using Python 
Table API as following:
 # Specify `PROCESS` mode
 table_env.get_config().get_configuration().set_string("python.execution-mode", 
"process")
 
-# Specify `MULTI-THREAD` mode
-table_env.get_config().get_configuration().set_string("python.execution-mode", 
"multi-thread")
-
-# Specify `SUB-INTERPRETER` mode
-table_env.get_config().get_configuration().set_string("python.execution-mode", 
"sub-interpreter")
+# Specify `THREAD` mode
+table_env.get_config().get_configuration().set_string("python.execution-mode", 
"thread")
 ```
 
 {{< hint info >}}
-Currently, it still doesn't support to execute Python UDFs in `MULTI-THREAD` 
and `SUB-INTERPRETER` execution mode
-in all places. It will fall back to `PROCESS` execution mode in these cases. 
So it may happen that you configure a job
-to execute in `MULTI-THREAD` or `SUB-INTERPRETER` execution modes, however, 
it's actually executed in `PROCESS` execution mode.
+Currently, it still doesn't support to execute Python UDFs in `THREAD` 
execution mode in all places.
+It will fall back to `PROCESS` execution mode in these cases. So it may happen 
that you configure a job
+to execute in `THREAD` execution mode, however, it's actually executed in 
`PROCESS` execution mode.
 {{< /hint >}}
 {{< hint info >}}
-`MULTI-THREAD` execution mode only supports Python 3.7+. `SUB-INTERPRETER` 
execution mode only supports Python 3.8+.  
+`THREAD` execution mode is only supported in Python 3.7+.
 {{< /hint >}}
 
 ## Execution Behavior
 
-This section provides an overview of the execution behavior of `MULTI-THREAD` 
and `SUB-INTERPRETER`
-execution mode and contrasts they with `PROCESS` execution mode. For more
-details, please refer to the FLIP that introduced this feature:
+This section provides an overview of the execution behavior of `THREAD` 
execution mode and contrasts
+they with `PROCESS` execution mode. For more details, please refer to the FLIP 
that introduced this feature:
 
[FLIP-206](https://cwiki.apache.org/confluence/display/FLINK/FLIP-206%3A+Support+PyFlink+Runtime+Execution+in+Thread+Mode).
 
 #### PROCESS Execution Mode
@@ -95,10 +82,10 @@ The Java operator process communicates with the Python 
worker process using vari
 
 {{< img src="/fig/pyflink_process_execution_mode.png" alt="Process Execution 
Mode" >}}
 
-#### MULTI-THREAD and SUB-INTERPRETER Execution Mode
+#### THREAD Execution Mode
 
-In `MULTI-THREAD` and `SUB-INTERPRETER` execution mode, the Python 
user-defined functions will be executed in
-the same process as Java operators. PyFlink takes use of third part library 
[PEMJA](https://github.com/alibaba/pemja) to
-embed Python in Java Application.
+In `THREAD` execution mode, the Python user-defined functions will be executed 
in the same process
+as Java operators. PyFlink takes use of third part library 
[PEMJA](https://github.com/alibaba/pemja)
+to embed Python in Java Application.
 
 {{< img src="/fig/pyflink_embedded_execution_mode.png" alt="Embedded Execution 
Mode" >}}
diff --git a/docs/content/docs/dev/python/python_execution_mode.md 
b/docs/content/docs/dev/python/python_execution_mode.md
index 2c96958..4f5ecf2 100644
--- a/docs/content/docs/dev/python/python_execution_mode.md
+++ b/docs/content/docs/dev/python/python_execution_mode.md
@@ -31,33 +31,24 @@ defines how to execute your customized Python functions.
 Prior to release-1.15, there is the only execution mode called `PROCESS` 
execution mode. The `PROCESS`
 mode means that the Python user-defined functions will be executed in separate 
Python processes.
 
-In release-1.15, it has introduced another two execution modes called 
`MULTI-THREAD` execution mode and
-`SUB-INTERPRETER` execution mode. The `MULTI-THREAD` mode means that the 
Python user-defined functions
-will be executed in the same thread as Java Operator, but it will be affected 
by GIL performance.
-The `SUB-INTERPRETER` mode means that the Python user-defined functions will 
be executed in Python
-different sub-interpreters rather than different threads of one interpreter, 
which can largely overcome
-the effects of the GIL, but some CPython extensions libraries doesn't support 
it, such as numpy, tensorflow, etc.
+In release-1.15, it has introduced a new execution mode called `THREAD` 
execution mode. The `THREAD`
+mode means that the Python user-defined functions will be executed in the same 
process as Java Operator,
+It should be noted that multiple Python user-defined functions running in the 
same JVM are still affected by GIL.
 
-## When can/should I use MULTI-THREAD execution mode or SUB-INTERPRETER 
execution mode?
+## When can/should I use THREAD execution mode?
 
-The purpose of the introduction of `MULTI-THREAD` mode and `SUB-INTERPRETER` 
mode is to overcome the
-overhead of serialization/deserialization and network communication caused in 
`PROCESS` mode.
-So if performance is not your concern, or the computing logic of your 
customized Python functions is
-the performance bottleneck of the job, `PROCESS` mode will be the best choice 
as `PROCESS` mode provides
-the best isolation compared to `MULTI-THREAD` mode and `SUB-INTERPRETER` mode.
-
-Compared to `MULTI-THREAD` execution mode, `SUB-INTERPRETER` execution mode 
can largely overcome the
-effects of the GIL, so you can get better performance usually. However, 
`SUB-INTERPRETER` may fail in some CPython
-extensions libraries, such as numpy, tensorflow. In this case, you should use 
`PROCESS` mode or `MULTI-THREAD` mode.
+The purpose of the introduction of `THREAD` mode is to overcome the overhead 
of serialization/deserialization
+and network communication caused in `PROCESS` mode. So if performance is not 
your concern, or the computing
+logic of your customized Python functions is the performance bottleneck of the 
job, `PROCESS` mode will
+be the best choice as `PROCESS` mode provides the best isolation compared to 
`THREAD` mode.
 
 ## Configuring Python execution mode
 
 The execution mode can be configured via the `python.execution-mode` setting.
-There are three possible values:
+There are two possible values:
 
  - `PROCESS`: The Python user-defined functions will be executed in separate 
Python process. (default)
- - `MULTI-THREAD`: The Python user-defined functions will be executed in the 
same thread as Java Operator.
- - `SUB-INTERPRETER`: The Python user-defined functions will be executed in 
Python different sub-interpreters.
+ - `THREAD`: The Python user-defined functions will be executed in the same 
process as Java operator.
 
 You could specify the Python execution mode using Python Table API as 
following:
 
@@ -65,27 +56,23 @@ You could specify the Python execution mode using Python 
Table API as following:
 # Specify `PROCESS` mode
 table_env.get_config().get_configuration().set_string("python.execution-mode", 
"process")
 
-# Specify `MULTI-THREAD` mode
-table_env.get_config().get_configuration().set_string("python.execution-mode", 
"multi-thread")
-
-# Specify `SUB-INTERPRETER` mode
-table_env.get_config().get_configuration().set_string("python.execution-mode", 
"sub-interpreter")
+# Specify `THREAD` mode
+table_env.get_config().get_configuration().set_string("python.execution-mode", 
"thread")
 ```
 
 {{< hint info >}}
-Currently, it still doesn't support to execute Python UDFs in `MULTI-THREAD` 
and `SUB-INTERPRETER` execution mode
-in all places. It will fall back to `PROCESS` execution mode in these cases. 
So it may happen that you configure a job
-to execute in `MULTI-THREAD` or `SUB-INTERPRETER` execution modes, however, 
it's actually executed in `PROCESS` execution mode.
+Currently, it still doesn't support to execute Python UDFs in `THREAD` 
execution mode in all places.
+It will fall back to `PROCESS` execution mode in these cases. So it may happen 
that you configure a job
+to execute in `THREAD` execution mode, however, it's actually executed in 
`PROCESS` execution mode.
 {{< /hint >}}
 {{< hint info >}}
-`MULTI-THREAD` execution mode only supports Python 3.7+. `SUB-INTERPRETER` 
execution mode only supports Python 3.8+.  
+`THREAD` execution mode is only supported in Python 3.7+.
 {{< /hint >}}
 
 ## Execution Behavior
 
-This section provides an overview of the execution behavior of `MULTI-THREAD` 
and `SUB-INTERPRETER`
-execution mode and contrasts they with `PROCESS` execution mode. For more
-details, please refer to the FLIP that introduced this feature:
+This section provides an overview of the execution behavior of `THREAD` 
execution mode and contrasts
+they with `PROCESS` execution mode. For more details, please refer to the FLIP 
that introduced this feature:
 
[FLIP-206](https://cwiki.apache.org/confluence/display/FLINK/FLIP-206%3A+Support+PyFlink+Runtime+Execution+in+Thread+Mode).
 
 #### PROCESS Execution Mode
@@ -95,10 +82,10 @@ The Java operator process communicates with the Python 
worker process using vari
 
 {{< img src="/fig/pyflink_process_execution_mode.png" alt="Process Execution 
Mode" >}}
 
-#### MULTI-THREAD and SUB-INTERPRETER Execution Mode
+#### THREAD Execution Mode
 
-In `MULTI-THREAD` and `SUB-INTERPRETER` execution mode, the Python 
user-defined functions will be executed in
-the same process as Java operators. PyFlink takes use of third part library 
[PEMJA](https://github.com/alibaba/pemja) to
-embed Python in Java Application.
+In `THREAD` execution mode, the Python user-defined functions will be executed 
in the same process
+as Java operators. PyFlink takes use of third part library 
[PEMJA](https://github.com/alibaba/pemja)
+to embed Python in Java Application.
 
 {{< img src="/fig/pyflink_embedded_execution_mode.png" alt="Embedded Execution 
Mode" >}}
diff --git a/docs/layouts/shortcodes/generated/python_configuration.html 
b/docs/layouts/shortcodes/generated/python_configuration.html
index 9ad42d8..cdaabcf 100644
--- a/docs/layouts/shortcodes/generated/python_configuration.html
+++ b/docs/layouts/shortcodes/generated/python_configuration.html
@@ -30,7 +30,7 @@
             <td><h5>python.execution-mode</h5></td>
             <td style="word-wrap: break-word;">"process"</td>
             <td>String</td>
-            <td>Specify the python runtime execution mode. The optional values 
are `process`, `multi-thread` and `sub-interpreter`. The `process` mode means 
that the Python user-defined functions will be executed in separate Python 
process. The `multi-thread` mode means that the Python user-defined functions 
will be executed in the same thread as Java Operator, but it will be affected 
by GIL performance. The `sub-interpreter` mode means that the Python 
user-defined functions will be exec [...]
+            <td>Specify the python runtime execution mode. The optional values 
are `process` and `thread`. The `process` mode means that the Python 
user-defined functions will be executed in separate Python process. The 
`thread` mode means that the Python user-defined functions will be executed in 
the same process of the Java operator. Note that currently it still doesn't 
support to execute Python user-defined functions in `thread` mode in all 
places. It will fall back to `process` mode  [...]
         </tr>
         <tr>
             <td><h5>python.files</h5></td>
diff --git a/flink-python/pyflink/table/tests/test_dependency.py 
b/flink-python/pyflink/table/tests/test_dependency.py
index b180c72..5c4ae84 100644
--- a/flink-python/pyflink/table/tests/test_dependency.py
+++ b/flink-python/pyflink/table/tests/test_dependency.py
@@ -93,19 +93,10 @@ class DependencyTests(object):
 
 
 @pytest.mark.skipif(sys.version_info < (3, 7), reason="requires python3.7")
-class EmbeddedMultiThreadDependencyTests(DependencyTests, 
PyFlinkStreamTableTestCase):
+class EmbeddedThreadDependencyTests(DependencyTests, 
PyFlinkStreamTableTestCase):
     def setUp(self):
-        super(EmbeddedMultiThreadDependencyTests, self).setUp()
-        
self.t_env.get_config().get_configuration().set_string("python.execution-mode",
-                                                               "multi-thread")
-
-
[email protected](sys.version_info < (3, 8), reason="requires python3.8")
-class EmbeddedSubInterpreterDependencyTests(DependencyTests, 
PyFlinkStreamTableTestCase):
-    def setUp(self):
-        super(EmbeddedSubInterpreterDependencyTests, self).setUp()
-        
self.t_env.get_config().get_configuration().set_string("python.execution-mode",
-                                                               
"sub-interpreter")
+        super(EmbeddedThreadDependencyTests, self).setUp()
+        
self.t_env.get_config().get_configuration().set_string("python.execution-mode", 
"thread")
 
 
 class BatchDependencyTests(DependencyTests, PyFlinkBatchTableTestCase):
diff --git a/flink-python/pyflink/table/tests/test_udf.py 
b/flink-python/pyflink/table/tests/test_udf.py
index 6abdee2..3e7562a 100644
--- a/flink-python/pyflink/table/tests/test_udf.py
+++ b/flink-python/pyflink/table/tests/test_udf.py
@@ -797,19 +797,10 @@ class 
PyFlinkBatchUserDefinedFunctionTests(UserDefinedFunctionTests,
 
 
 @pytest.mark.skipif(sys.version_info < (3, 7), reason="requires python3.7")
-class PyFlinkEmbeddedMultiThreadTests(UserDefinedFunctionTests, 
PyFlinkBatchTableTestCase):
+class PyFlinkEmbeddedThreadTests(UserDefinedFunctionTests, 
PyFlinkBatchTableTestCase):
     def setUp(self):
-        super(PyFlinkEmbeddedMultiThreadTests, self).setUp()
-        
self.t_env.get_config().get_configuration().set_string("python.execution-mode",
-                                                               "multi-thread")
-
-
[email protected](sys.version_info < (3, 8), reason="requires python3.8")
-class PyFlinkEmbeddedSubInterpreterTests(UserDefinedFunctionTests, 
PyFlinkBatchTableTestCase):
-    def setUp(self):
-        super(PyFlinkEmbeddedSubInterpreterTests, self).setUp()
-        
self.t_env.get_config().get_configuration().set_string("python.execution-mode",
-                                                               
"sub-interpreter")
+        super(PyFlinkEmbeddedThreadTests, self).setUp()
+        
self.t_env.get_config().get_configuration().set_string("python.execution-mode", 
"thread")
 
 
 # test specify the input_types
diff --git 
a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java 
b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
index 649e2e6..78cd3b5 100644
--- a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
+++ b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
@@ -231,10 +231,9 @@ public class PythonOptions {
                     .stringType()
                     .defaultValue("process")
                     .withDescription(
-                            "Specify the python runtime execution mode. The 
optional values are `process`, `multi-thread` and `sub-interpreter`. "
+                            "Specify the python runtime execution mode. The 
optional values are `process` and `thread`. "
                                     + "The `process` mode means that the 
Python user-defined functions will be executed in separate Python process. "
-                                    + "The `multi-thread` mode means that the 
Python user-defined functions will be executed in the same thread as Java 
Operator, but it will be affected by GIL performance. "
-                                    + "The `sub-interpreter` mode means that 
the Python user-defined functions will be executed in python different 
sub-interpreters rather than different threads of one interpreter, "
-                                    + "which can largely overcome the effects 
of the GIL, but it maybe fail in some CPython extensions libraries, such as 
numpy, tensorflow. "
-                                    + "Note that if the python operator dose 
not support `multi-thread` and `sub-interpreter` mode, we will still use 
`process` mode.");
+                                    + "The `thread` mode means that the Python 
user-defined functions will be executed in the same process of the Java 
operator. "
+                                    + "Note that currently it still doesn't 
support to execute Python user-defined functions in `thread` mode in all 
places. "
+                                    + "It will fall back to `process` mode in 
these cases.");
 }
diff --git 
a/flink-python/src/main/java/org/apache/flink/python/env/embedded/EmbeddedPythonEnvironmentManager.java
 
b/flink-python/src/main/java/org/apache/flink/python/env/embedded/EmbeddedPythonEnvironmentManager.java
index 1d7e9a0..750284e 100644
--- 
a/flink-python/src/main/java/org/apache/flink/python/env/embedded/EmbeddedPythonEnvironmentManager.java
+++ 
b/flink-python/src/main/java/org/apache/flink/python/env/embedded/EmbeddedPythonEnvironmentManager.java
@@ -54,9 +54,7 @@ public class EmbeddedPythonEnvironmentManager extends 
AbstractPythonEnvironmentM
 
         String executionMode = dependencyInfo.getExecutionMode();
 
-        if (executionMode.equalsIgnoreCase("sub-interpreter")) {
-            execType = PythonInterpreterConfig.ExecType.SUB_INTERPRETER;
-        } else if (executionMode.equalsIgnoreCase("multi-thread")) {
+        if (executionMode.equalsIgnoreCase("thread")) {
             execType = PythonInterpreterConfig.ExecType.MULTI_THREAD;
         } else {
             throw new RuntimeException(
@@ -66,16 +64,8 @@ public class EmbeddedPythonEnvironmentManager extends 
AbstractPythonEnvironmentM
         String pythonVersion =
                 
PythonEnvironmentManagerUtils.getPythonVersion(dependencyInfo.getPythonExec());
 
-        if (execType == PythonInterpreterConfig.ExecType.SUB_INTERPRETER) {
-            if (pythonVersion.compareTo("3.8") < 0) {
-                throw new RuntimeException(
-                        "`SUB-INTERPRETER` execution mode only supports Python 
3.8+");
-            }
-        } else {
-            if (pythonVersion.compareTo("3.7") < 0) {
-                throw new RuntimeException(
-                        "`MULTI-THREAD` execution mode only supports Python 
3.7+");
-            }
+        if (pythonVersion.compareTo("3.7") < 0) {
+            throw new RuntimeException("`THREAD` execution mode only supports 
Python 3.7+");
         }
 
         if (env.containsKey("FLINK_TESTING")) {
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java
index f23e3d2..be709e6 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractEmbeddedPythonFunctionOperator.java
@@ -105,7 +105,7 @@ public abstract class 
AbstractEmbeddedPythonFunctionOperator<OUT>
             }
         }
 
-        openPythonInterpreter(pythonConfig.getPythonExec(), env, 
interpreterConfig.getExecType());
+        openPythonInterpreter(pythonConfig.getPythonExec(), env);
     }
 
     @Override
@@ -152,11 +152,7 @@ public abstract class 
AbstractEmbeddedPythonFunctionOperator<OUT>
     }
 
     /** Setup method for Python Interpreter. It can be used for initialization 
work. */
-    public abstract void openPythonInterpreter(
-            String pythonExecutable,
-            Map<String, String> env,
-            PythonInterpreterConfig.ExecType execType)
-            throws Exception;
+    public abstract void openPythonInterpreter(String pythonExecutable, 
Map<String, String> env);
 
     /** Returns the {@link PythonEnv} used to create PythonEnvironmentManager. 
*/
     public abstract PythonEnv getPythonEnv();
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/EmbeddedPythonScalarFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/EmbeddedPythonScalarFunctionOperator.java
index 848b678..3c843cd 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/EmbeddedPythonScalarFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/EmbeddedPythonScalarFunctionOperator.java
@@ -40,13 +40,6 @@ import 
org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 
-import pemja.core.PythonInterpreterConfig;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -156,46 +149,21 @@ public class EmbeddedPythonScalarFunctionOperator
     }
 
     @Override
-    public void openPythonInterpreter(
-            String pythonExecutable,
-            Map<String, String> env,
-            PythonInterpreterConfig.ExecType execType)
-            throws Exception {
-        if (execType.equals(PythonInterpreterConfig.ExecType.SUB_INTERPRETER)) 
{
-            LOG.info("Create Operation in sub-interpreters.");
-            String[] commands =
-                    new String[] {
-                        pythonExecutable,
-                        "-c",
-                        String.format(
-                                "from 
pyflink.fn_execution.utils.operation_utils import 
create_serialized_scalar_operation_from_proto;"
-                                        + 
"print(create_serialized_scalar_operation_from_proto(%s, %s, %s))",
-                                
Arrays.toString(getUserDefinedFunctionsProto().toByteArray()),
-                                isOneArg ? "True" : "False",
-                                isOneFieldResult ? "True" : "False")
-                    };
-            interpreter.exec(
-                    "from pyflink.fn_execution.utils.operation_utils import 
deserialized_operation_from_serialized_bytes");
-            interpreter.exec(
-                    String.format(
-                            "scalar_operation = 
deserialized_operation_from_serialized_bytes(%s)",
-                            executeScript(commands, env)));
-        } else {
-            LOG.info("Create Operation in multi-threads.");
-
-            // The CPython extension included in proto does not support 
initialization
-            // multiple times, so we choose the only interpreter process to be 
responsible for
-            // initialization and proto parsing. The only interpreter parses 
the proto and
-            // serializes function operations with cloudpickle.
-            interpreter.exec(
-                    "from pyflink.fn_execution.utils.operation_utils import 
create_scalar_operation_from_proto");
-            interpreter.set("proto", 
getUserDefinedFunctionsProto().toByteArray());
-
-            interpreter.exec(
-                    String.format(
-                            "scalar_operation = 
create_scalar_operation_from_proto(proto, %s, %s)",
-                            isOneArg ? "True" : "False", isOneFieldResult ? 
"True" : "False"));
-        }
+    public void openPythonInterpreter(String pythonExecutable, Map<String, 
String> env) {
+        LOG.info("Create Operation in multi-threads.");
+
+        // The CPython extension included in proto does not support 
initialization
+        // multiple times, so we choose the only interpreter process to be 
responsible for
+        // initialization and proto parsing. The only interpreter parses the 
proto and
+        // serializes function operations with cloudpickle.
+        interpreter.exec(
+                "from pyflink.fn_execution.utils.operation_utils import 
create_scalar_operation_from_proto");
+        interpreter.set("proto", getUserDefinedFunctionsProto().toByteArray());
+
+        interpreter.exec(
+                String.format(
+                        "scalar_operation = 
create_scalar_operation_from_proto(proto, %s, %s)",
+                        isOneArg ? "True" : "False", isOneFieldResult ? "True" 
: "False"));
 
         // invoke `open` method of ScalarOperation.
         interpreter.invokeMethod("scalar_operation", "open");
@@ -272,32 +240,4 @@ public class EmbeddedPythonScalarFunctionOperator
         builder.setProfileEnabled(pythonConfig.isProfileEnabled());
         return builder.build();
     }
-
-    private String executeScript(final String[] commands, Map<String, String> 
env)
-            throws IOException {
-        ProcessBuilder pb = new ProcessBuilder(commands);
-        pb.environment().putAll(env);
-        pb.redirectErrorStream(true);
-        Process p = pb.start();
-        InputStream in = new BufferedInputStream(p.getInputStream());
-        StringBuilder out = new StringBuilder();
-        String s;
-        try (BufferedReader br = new BufferedReader(new 
InputStreamReader(in))) {
-            while ((s = br.readLine()) != null) {
-                out.append(s).append("\n");
-            }
-        }
-        try {
-            if (p.waitFor() != 0) {
-                throw new IOException(
-                        String.format(
-                                "Failed to execute the command: %s\noutput: 
%s",
-                                String.join(" ", commands), out));
-            }
-        } catch (InterruptedException e) {
-            // Ignored. The subprocess is dead after "br.readLine()" returns 
null, so the call of
-            // "waitFor" should return intermediately.
-        }
-        return out.toString();
-    }
 }

Reply via email to