This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new d9e6abf  [FLINK-24123][python] Optimize the python operator instances 
of the same job to share one python environment resources in the same jvm
d9e6abf is described below

commit d9e6abf83d5187095403532ebbbd01aa6837a556
Author: huangxingbo <[email protected]>
AuthorDate: Tue Sep 7 17:24:15 2021 +0800

    [FLINK-24123][python] Optimize the python operator instances of the same 
job to share one python environment resources in the same jvm
    
    This closes #17180.
---
 docs/content.zh/docs/dev/python/debugging.md       |   2 -
 docs/content/docs/dev/python/debugging.md          |   2 -
 flink-python/dev/integration_test.sh               |   3 +-
 .../datastream/stream_execution_environment.py     |  19 +-
 .../fn_execution/beam/beam_worker_pool_service.py  |  41 +++-
 flink-python/pyflink/table/table_environment.py    |  22 +-
 .../pyflink/table/tests/test_dependency.py         |   1 -
 .../env/beam/ProcessPythonEnvironmentManager.java  | 255 +++++++++++++++------
 .../python/AbstractPythonFunctionOperator.java     |   3 +-
 .../beam/ProcessPythonEnvironmentManagerTest.java  |  42 ++--
 .../flink/table/runtime/utils/PythonTestUtils.java |   4 +-
 11 files changed, 261 insertions(+), 133 deletions(-)

diff --git a/docs/content.zh/docs/dev/python/debugging.md 
b/docs/content.zh/docs/dev/python/debugging.md
index b7a5d99..2d54424 100644
--- a/docs/content.zh/docs/dev/python/debugging.md
+++ b/docs/content.zh/docs/dev/python/debugging.md
@@ -85,8 +85,6 @@ $ python -c "import pyflink;import 
os;print(os.path.dirname(os.path.abspath(pyfl
 
 你可以直接在 PyCharm 等 IDE 调试你的 Python 函数。
 
-**注意:** 当前,如果你使用了配置 `python-archives`,并且作业的并发度是大于`1`的,只能够使用[远程调试](#远程调试)的方式。
-
 ### 远程调试
 
 
你可以利用PyCharm提供的[`pydevd_pycharm`](https://pypi.org/project/pydevd-pycharm/)工具进行Python
 UDF的调试
diff --git a/docs/content/docs/dev/python/debugging.md 
b/docs/content/docs/dev/python/debugging.md
index c6963ef..cd58365 100644
--- a/docs/content/docs/dev/python/debugging.md
+++ b/docs/content/docs/dev/python/debugging.md
@@ -89,8 +89,6 @@ $ python -c "import pyflink;import 
os;print(os.path.dirname(os.path.abspath(pyfl
 
 You can debug your python functions directly in IDEs such as PyCharm.
 
-**Note:** Currently, if you use `python-archives` in the job and the 
parallelism of the job is greater than `1`, you can only use [remote 
debug](#remote-debug) mode.
-
 ### Remote Debug
 
 You can make use of the 
[`pydevd_pycharm`](https://pypi.org/project/pydevd-pycharm/) tool of PyCharm to 
debug Python UDFs.
diff --git a/flink-python/dev/integration_test.sh 
b/flink-python/dev/integration_test.sh
index 36c0cae..c358632 100755
--- a/flink-python/dev/integration_test.sh
+++ b/flink-python/dev/integration_test.sh
@@ -37,8 +37,7 @@ FLINK_PYTHON_DIR=$(dirname "$CURRENT_DIR")
 test_module "common"
 
 # test datastream module
-test_module "datastream" "--ignore 
$FLINK_PYTHON_DIR/pyflink/datastream/tests/test_stream_execution_environment.py"
-pytest 
"$FLINK_PYTHON_DIR/pyflink/datastream/tests/test_stream_execution_environment.py"
+test_module "datastream"
 
 # test fn_execution module
 test_module "fn_execution"
diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py 
b/flink-python/pyflink/datastream/stream_execution_environment.py
index 248c4fb..1d299da 100644
--- a/flink-python/pyflink/datastream/stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/stream_execution_environment.py
@@ -873,21 +873,12 @@ class StreamExecutionEnvironment(object):
         j_configuration = 
get_j_env_configuration(self._j_stream_execution_environment)
 
         def startup_loopback_server():
+            from pyflink.fn_execution.beam.beam_worker_pool_service import \
+                BeamFnLoopbackWorkerPoolServicer
             jvm = gateway.jvm
-            env_config = JPythonConfigUtil.getEnvironmentConfig(
-                self._j_stream_execution_environment)
-            parallelism = self.get_parallelism()
-            if parallelism > 1 and 
env_config.containsKey(jvm.PythonOptions.PYTHON_ARCHIVES.key()):
-                import logging
-                logging.warning("Loopback mode is disabled as python archives 
are used and the "
-                                "parallelism of the job is greater than 1. The 
Python user-defined "
-                                "functions will be executed in an independent 
Python process.")
-            else:
-                from pyflink.fn_execution.beam.beam_worker_pool_service import 
\
-                    BeamFnLoopbackWorkerPoolServicer
-                j_env = jvm.System.getenv()
-                get_field_value(j_env, "m").put(
-                    'PYFLINK_LOOPBACK_SERVER_ADDRESS', 
BeamFnLoopbackWorkerPoolServicer().start())
+            j_env = jvm.System.getenv()
+            get_field_value(j_env, "m").put(
+                'PYFLINK_LOOPBACK_SERVER_ADDRESS', 
BeamFnLoopbackWorkerPoolServicer().start())
 
         python_worker_execution_mode = None
         if hasattr(self, "_python_worker_execution_mode"):
diff --git a/flink-python/pyflink/fn_execution/beam/beam_worker_pool_service.py 
b/flink-python/pyflink/fn_execution/beam/beam_worker_pool_service.py
index 2d61a55..3d0e868 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_worker_pool_service.py
+++ b/flink-python/pyflink/fn_execution/beam/beam_worker_pool_service.py
@@ -62,6 +62,9 @@ class 
BeamFnLoopbackWorkerPoolServicer(beam_fn_api_pb2_grpc.BeamFnExternalWorker
     def __init__(self):
         self._parse_param_lock = threading.Lock()
         self._worker_address = None
+        self._old_working_dir = None
+        self._old_python_path = None
+        self._ref_cnt = 0
 
     def start(self):
         if not self._worker_address:
@@ -91,17 +94,20 @@ class 
BeamFnLoopbackWorkerPoolServicer(beam_fn_api_pb2_grpc.BeamFnExternalWorker
 
     def _start_sdk_worker_main(self, start_worker_request: 
beam_fn_api_pb2.StartWorkerRequest):
         params = start_worker_request.params
-        base_dir = None
         self._parse_param_lock.acquire()
-        if 'PYTHONPATH' in params:
-            python_path_list = params['PYTHONPATH'].split(':')
-            python_path_list.reverse()
-            for path in python_path_list:
-                sys.path.insert(0, path)
-        if '_PYTHON_WORKING_DIR' in params:
-            base_dir = os.getcwd()
-            os.chdir(params['_PYTHON_WORKING_DIR'])
-        os.environ.update(params)
+        # The first thread to start is responsible for preparing all execution 
environment.
+        if not self._ref_cnt:
+            if 'PYTHONPATH' in params:
+                self._old_python_path = sys.path[:]
+                python_path_list = params['PYTHONPATH'].split(':')
+                python_path_list.reverse()
+                for path in python_path_list:
+                    sys.path.insert(0, path)
+            if '_PYTHON_WORKING_DIR' in params:
+                self._old_working_dir = os.getcwd()
+                os.chdir(params['_PYTHON_WORKING_DIR'])
+            os.environ.update(params)
+        self._ref_cnt += 1
         self._parse_param_lock.release()
 
         # read job information from provision stub
@@ -151,7 +157,18 @@ class 
BeamFnLoopbackWorkerPoolServicer(beam_fn_api_pb2_grpc.BeamFnExternalWorker
             _LOGGER.exception('Python sdk harness failed: ')
             raise
         finally:
-            if base_dir:
-                os.chdir(base_dir)
+            self._parse_param_lock.acquire()
+            self._ref_cnt -= 1
+            # The last thread to exit is responsible for reverting working 
directory and sys.path.
+            if self._ref_cnt == 0:
+                if self._old_python_path is not None:
+                    sys.path.clear()
+                    for item in self._old_python_path:
+                        sys.path.append(item)
+                    self._old_python_path = None
+                if self._old_working_dir is not None:
+                    os.chdir(self._old_working_dir)
+                    self._old_working_dir = None
+            self._parse_param_lock.release()
             if fn_log_handler:
                 fn_log_handler.close()
diff --git a/flink-python/pyflink/table/table_environment.py 
b/flink-python/pyflink/table/table_environment.py
index 9db27be..1e10a0b 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -1749,24 +1749,12 @@ class TableEnvironment(object):
 
         # start BeamFnLoopbackWorkerPoolServicer when executed in MiniCluster
         def startup_loopback_server():
-            from pyflink.common import Configuration
-            _j_config = 
jvm.org.apache.flink.python.util.PythonConfigUtil.getMergedConfig(
-                self._get_j_env(), self.get_config()._j_table_config)
-            config = Configuration(j_configuration=_j_config)
-            parallelism = int(config.get_string("parallelism.default", "1"))
-
-            if parallelism > 1 and 
config.contains_key(jvm.PythonOptions.PYTHON_ARCHIVES.key()):
-                import logging
-                logging.warning("Lookback mode is disabled as python archives 
are used and the "
-                                "parallelism of the job is greater than 1. The 
Python user-defined "
-                                "functions will be executed in an independent 
Python process.")
-            else:
-                from pyflink.fn_execution.beam.beam_worker_pool_service import 
\
-                    BeamFnLoopbackWorkerPoolServicer
+            from pyflink.fn_execution.beam.beam_worker_pool_service import \
+                BeamFnLoopbackWorkerPoolServicer
 
-                j_env = jvm.System.getenv()
-                get_field_value(j_env, "m").put(
-                    'PYFLINK_LOOPBACK_SERVER_ADDRESS', 
BeamFnLoopbackWorkerPoolServicer().start())
+            j_env = jvm.System.getenv()
+            get_field_value(j_env, "m").put(
+                'PYFLINK_LOOPBACK_SERVER_ADDRESS', 
BeamFnLoopbackWorkerPoolServicer().start())
 
         python_worker_execution_mode = None
         if hasattr(self, "_python_worker_execution_mode"):
diff --git a/flink-python/pyflink/table/tests/test_dependency.py 
b/flink-python/pyflink/table/tests/test_dependency.py
index 27ebf6b..5a8e7ab 100644
--- a/flink-python/pyflink/table/tests/test_dependency.py
+++ b/flink-python/pyflink/table/tests/test_dependency.py
@@ -160,7 +160,6 @@ class StreamDependencyTests(DependencyTests, 
PyFlinkStreamTableTestCase):
             with open("data/data.txt", 'r') as f:
                 return i + int(f.read())
 
-        
self.t_env.get_config().get_configuration().set_string("parallelism.default", 
"1")
         self.t_env.create_temporary_system_function("add_from_file",
                                                     udf(add_from_file, 
DataTypes.BIGINT(),
                                                         DataTypes.BIGINT()))
diff --git 
a/flink-python/src/main/java/org/apache/flink/python/env/beam/ProcessPythonEnvironmentManager.java
 
b/flink-python/src/main/java/org/apache/flink/python/env/beam/ProcessPythonEnvironmentManager.java
index 41e29da..ab8413a 100644
--- 
a/flink-python/src/main/java/org/apache/flink/python/env/beam/ProcessPythonEnvironmentManager.java
+++ 
b/flink-python/src/main/java/org/apache/flink/python/env/beam/ProcessPythonEnvironmentManager.java
@@ -20,6 +20,8 @@ package org.apache.flink.python.env.beam;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.python.env.ProcessPythonEnvironment;
 import org.apache.flink.python.env.PythonDependencyInfo;
 import org.apache.flink.python.env.PythonEnvironment;
@@ -27,7 +29,9 @@ import org.apache.flink.python.env.PythonEnvironmentManager;
 import org.apache.flink.python.util.CompressionUtils;
 import org.apache.flink.python.util.PythonEnvironmentManagerUtils;
 import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.ShutdownHookUtil;
+import org.apache.flink.util.function.FunctionWithException;
 
 import org.apache.flink.shaded.guava30.com.google.common.base.Strings;
 
@@ -35,6 +39,8 @@ import org.codehaus.commons.nullanalysis.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.concurrent.GuardedBy;
+
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -51,6 +57,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.locks.ReentrantLock;
 
 import static 
org.apache.flink.python.util.PythonDependencyUtils.PARAM_DELIMITER;
 
@@ -85,43 +92,44 @@ public final class ProcessPythonEnvironmentManager 
implements PythonEnvironmentM
     private static final long CHECK_INTERVAL = 20;
     private static final long CHECK_TIMEOUT = 1000;
 
-    private transient String baseDirectory;
-
-    /** Directory for storing the installation result of the requirements 
file. */
-    private transient String requirementsDirectory;
-
-    /** Directory for storing the extracted result of the archive files. */
-    private transient String archivesDirectory;
-
-    /** Directory for storing the uploaded python files. */
-    private transient String filesDirectory;
-
     private transient Thread shutdownHook;
 
+    private transient PythonEnvResources.PythonLeasedResource resource;
+
     @NotNull private final PythonDependencyInfo dependencyInfo;
     @NotNull private final Map<String, String> systemEnv;
     @NotNull private final String[] tmpDirectories;
+    @NotNull private final JobID jobID;
 
     public ProcessPythonEnvironmentManager(
             @NotNull PythonDependencyInfo dependencyInfo,
             @NotNull String[] tmpDirectories,
-            @NotNull Map<String, String> systemEnv) {
+            @NotNull Map<String, String> systemEnv,
+            @NotNull JobID jobID) {
         this.dependencyInfo = Objects.requireNonNull(dependencyInfo);
         this.tmpDirectories = Objects.requireNonNull(tmpDirectories);
         this.systemEnv = Objects.requireNonNull(systemEnv);
+        this.jobID = Objects.requireNonNull(jobID);
     }
 
     @Override
     public void open() throws Exception {
-        baseDirectory = createBaseDirectory(tmpDirectories);
-        archivesDirectory = String.join(File.separator, baseDirectory, 
PYTHON_ARCHIVES_DIR);
-        requirementsDirectory = String.join(File.separator, baseDirectory, 
PYTHON_REQUIREMENTS_DIR);
-        filesDirectory = String.join(File.separator, baseDirectory, 
PYTHON_FILES_DIR);
-
-        File baseDirectoryFile = new File(baseDirectory);
-        if (!baseDirectoryFile.exists() && !baseDirectoryFile.mkdir()) {
-            throw new IOException("Could not create the base directory: " + 
baseDirectory);
-        }
+        resource =
+                PythonEnvResources.getOrAllocateSharedResource(
+                        jobID,
+                        jobID -> {
+                            String baseDirectory = 
createBaseDirectory(tmpDirectories);
+
+                            File baseDirectoryFile = new File(baseDirectory);
+                            if (!baseDirectoryFile.exists() && 
!baseDirectoryFile.mkdir()) {
+                                throw new IOException(
+                                        "Could not create the base directory: 
" + baseDirectory);
+                            }
+
+                            Map<String, String> env = 
constructEnvironmentVariables(baseDirectory);
+                            installRequirements(baseDirectory, env);
+                            return Tuple2.of(baseDirectory, env);
+                        });
         shutdownHook =
                 ShutdownHookUtil.addShutdownHook(
                         this, 
ProcessPythonEnvironmentManager.class.getSimpleName(), LOG);
@@ -130,29 +138,7 @@ public final class ProcessPythonEnvironmentManager 
implements PythonEnvironmentM
     @Override
     public void close() throws Exception {
         try {
-            int retries = 0;
-            while (true) {
-                try {
-                    FileUtils.deleteDirectory(new File(baseDirectory));
-                    break;
-                } catch (Throwable t) {
-                    retries++;
-                    if (retries <= CHECK_TIMEOUT / CHECK_INTERVAL) {
-                        LOG.warn(
-                                String.format(
-                                        "Failed to delete the working 
directory %s of the Python UDF worker. Retrying...",
-                                        baseDirectory),
-                                t);
-                    } else {
-                        LOG.warn(
-                                String.format(
-                                        "Failed to delete the working 
directory %s of the Python UDF worker.",
-                                        baseDirectory),
-                                t);
-                        break;
-                    }
-                }
-            }
+            PythonEnvResources.release(jobID);
         } finally {
             if (shutdownHook != null) {
                 ShutdownHookUtil.removeShutdownHook(
@@ -163,18 +149,9 @@ public final class ProcessPythonEnvironmentManager 
implements PythonEnvironmentM
     }
 
     @Override
-    public PythonEnvironment createEnvironment() throws IOException {
-        Map<String, String> env = constructEnvironmentVariables();
+    public PythonEnvironment createEnvironment() throws Exception {
+        HashMap<String, String> env = new HashMap<>(resource.env);
 
-        if (dependencyInfo.getRequirementsFilePath().isPresent()) {
-            LOG.info("Trying to pip install the Python requirements...");
-            PythonEnvironmentManagerUtils.pipInstallRequirements(
-                    dependencyInfo.getRequirementsFilePath().get(),
-                    dependencyInfo.getRequirementsCacheDir().orElse(null),
-                    requirementsDirectory,
-                    dependencyInfo.getPythonExec(),
-                    env);
-        }
         String runnerScript =
                 PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(
                         dependencyInfo.getPythonExec(), env);
@@ -192,7 +169,8 @@ public final class ProcessPythonEnvironmentManager 
implements PythonEnvironmentM
     public String createRetrievalToken() throws IOException {
         File retrievalToken =
                 new File(
-                        baseDirectory, "retrieval_token_" + 
UUID.randomUUID().toString() + ".json");
+                        resource.baseDirectory,
+                        "retrieval_token_" + UUID.randomUUID().toString() + 
".json");
         if (retrievalToken.createNewFile()) {
             final DataOutputStream dos = new DataOutputStream(new 
FileOutputStream(retrievalToken));
             dos.writeBytes("{\"manifest\": {}}");
@@ -216,15 +194,15 @@ public final class ProcessPythonEnvironmentManager 
implements PythonEnvironmentM
      * @return The environment variables which contain the paths of the python 
dependencies.
      */
     @VisibleForTesting
-    Map<String, String> constructEnvironmentVariables()
+    Map<String, String> constructEnvironmentVariables(String baseDirectory)
             throws IOException, IllegalArgumentException {
         Map<String, String> env = new HashMap<>(this.systemEnv);
 
-        constructFilesDirectory(env);
+        constructFilesDirectory(env, baseDirectory);
 
-        constructArchivesDirectory(env);
+        constructArchivesDirectory(env, baseDirectory);
 
-        constructRequirementsDirectory(env);
+        constructRequirementsDirectory(env, baseDirectory);
 
         // set BOOT_LOG_DIR.
         env.put("BOOT_LOG_DIR", baseDirectory);
@@ -243,13 +221,34 @@ public final class ProcessPythonEnvironmentManager 
implements PythonEnvironmentM
         return env;
     }
 
+    @VisibleForTesting
+    void installRequirements(String baseDirectory, Map<String, String> env) 
throws IOException {
+        // Directory for storing the installation result of the requirements 
file.
+        String requirementsDirectory =
+                String.join(File.separator, baseDirectory, 
PYTHON_REQUIREMENTS_DIR);
+        if (dependencyInfo.getRequirementsFilePath().isPresent()) {
+            LOG.info("Trying to pip install the Python requirements...");
+            PythonEnvironmentManagerUtils.pipInstallRequirements(
+                    dependencyInfo.getRequirementsFilePath().get(),
+                    dependencyInfo.getRequirementsCacheDir().orElse(null),
+                    requirementsDirectory,
+                    dependencyInfo.getPythonExec(),
+                    env);
+        }
+    }
+
     public void setEnvironmentVariable(String key, String value) {
         this.systemEnv.put(key, value);
     }
 
-    private void constructFilesDirectory(Map<String, String> env) throws 
IOException {
+    private void constructFilesDirectory(Map<String, String> env, String 
baseDirectory)
+            throws IOException {
         // link or copy python files to filesDirectory and add them to 
PYTHONPATH
         List<String> pythonFilePaths = new ArrayList<>();
+
+        // Directory for storing the uploaded python files.
+        String filesDirectory = String.join(File.separator, baseDirectory, 
PYTHON_FILES_DIR);
+
         for (Map.Entry<String, String> entry : 
dependencyInfo.getPythonFiles().entrySet()) {
             // The origin file name will be wiped when downloaded from the 
distributed cache,
             // restore the origin name to
@@ -318,7 +317,11 @@ public final class ProcessPythonEnvironmentManager 
implements PythonEnvironmentM
         LOG.info("PYTHONPATH of python worker: {}", env.get("PYTHONPATH"));
     }
 
-    private void constructArchivesDirectory(Map<String, String> env) throws 
IOException {
+    private void constructArchivesDirectory(Map<String, String> env, String 
baseDirectory)
+            throws IOException {
+        // Directory for storing the extracted result of the archive files.
+        String archivesDirectory = String.join(File.separator, baseDirectory, 
PYTHON_ARCHIVES_DIR);
+
         if (!dependencyInfo.getArchives().isEmpty()) {
             // set the archives directory as the working directory, then user 
could access the
             // content of the archives via relative path
@@ -347,10 +350,13 @@ public final class ProcessPythonEnvironmentManager 
implements PythonEnvironmentM
         }
     }
 
-    private void constructRequirementsDirectory(Map<String, String> env) 
throws IOException {
+    private void constructRequirementsDirectory(Map<String, String> env, 
String baseDirectory)
+            throws IOException {
         // set the requirements file and the dependencies specified by the 
requirements file will be
         // installed in
         // boot.py during initialization
+        String requirementsDirectory =
+                String.join(File.separator, baseDirectory, 
PYTHON_REQUIREMENTS_DIR);
         if (dependencyInfo.getRequirementsFilePath().isPresent()) {
             File requirementsDirectoryFile = new File(requirementsDirectory);
             if (!requirementsDirectoryFile.mkdirs()) {
@@ -382,12 +388,18 @@ public final class ProcessPythonEnvironmentManager 
implements PythonEnvironmentM
 
     @VisibleForTesting
     String getBaseDirectory() {
-        return baseDirectory;
+        return resource.baseDirectory;
+    }
+
+    @VisibleForTesting
+    Map<String, String> getPythonEnv() {
+        return resource.env;
     }
 
     @Override
     public String getBootLog() throws Exception {
-        File bootLogFile = new File(baseDirectory + File.separator + 
"flink-python-udf-boot.log");
+        File bootLogFile =
+                new File(resource.baseDirectory + File.separator + 
"flink-python-udf-boot.log");
         String msg = "Failed to create stage bundle factory!";
         if (bootLogFile.exists()) {
             byte[] output = Files.readAllBytes(bootLogFile.toPath());
@@ -430,4 +442,117 @@ public final class ProcessPythonEnvironmentManager 
implements PythonEnvironmentM
                         + Arrays.toString(tmpDirectories)
                         + "' for storing the generated files of python 
dependency.");
     }
+
+    private static final class PythonEnvResources {
+
+        private static final ReentrantLock lock = new ReentrantLock();
+
+        @GuardedBy("lock")
+        private static final Map<Object, PythonLeasedResource> 
reservedResources = new HashMap<>();
+
+        static PythonLeasedResource getOrAllocateSharedResource(
+                Object type,
+                FunctionWithException<Object, Tuple2<String, Map<String, 
String>>, Exception>
+                        initializer)
+                throws Exception {
+            try {
+                lock.lockInterruptibly();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException("Interrupted which preparing python 
environment.");
+            }
+
+            try {
+                PythonLeasedResource resource = reservedResources.get(type);
+                if (resource == null) {
+                    resource = createResource(initializer, type);
+                    reservedResources.put(type, resource);
+                }
+                resource.incRef();
+                return resource;
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        public static void release(JobID jobID) throws Exception {
+            lock.lock();
+            try {
+                final PythonLeasedResource resource = 
reservedResources.get(jobID);
+                if (resource == null) {
+                    return;
+                }
+                resource.decRef();
+                if (resource.refCount == 0) {
+                    reservedResources.remove(jobID);
+                    resource.close();
+                }
+
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        private static PythonLeasedResource createResource(
+                FunctionWithException<Object, Tuple2<String, Map<String, 
String>>, Exception>
+                        initializer,
+                Object type)
+                throws Exception {
+            Tuple2<String, Map<String, String>> resource = 
initializer.apply(type);
+            String baseDirectory = resource.f0;
+            Map<String, String> env = resource.f1;
+            return new PythonLeasedResource(baseDirectory, env);
+        }
+
+        private static final class PythonLeasedResource implements 
AutoCloseable {
+            private final Map<String, String> env;
+
+            /** The base directory of the Python Environment. */
+            private final String baseDirectory;
+
+            /** Keep track of the number of threads sharing this Python 
environment resources. */
+            private long refCount = 0;
+
+            PythonLeasedResource(String baseDirectory, Map<String, String> 
env) {
+                this.baseDirectory = baseDirectory;
+                this.env = env;
+            }
+
+            void incRef() {
+                this.refCount += 1;
+            }
+
+            void decRef() {
+                Preconditions.checkState(refCount > 0);
+                this.refCount -= 1;
+            }
+
+            @Override
+            public void close() throws Exception {
+                int retries = 0;
+                while (true) {
+                    try {
+                        FileUtils.deleteDirectory(new File(baseDirectory));
+                        break;
+                    } catch (Throwable t) {
+                        retries++;
+                        if (retries <= CHECK_TIMEOUT / CHECK_INTERVAL) {
+                            LOG.warn(
+                                    String.format(
+                                            "Failed to delete the working 
directory %s of the Python UDF worker. Retrying...",
+                                            baseDirectory),
+                                    t);
+                        } else {
+                            LOG.warn(
+                                    String.format(
+                                            "Failed to delete the working 
directory %s of the Python UDF worker.",
+                                            baseDirectory),
+                                    t);
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+    }
 }
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
index 7b5ab9a..d321370 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
@@ -381,7 +381,8 @@ public abstract class AbstractPythonFunctionOperator<OUT> 
extends AbstractStream
             return new ProcessPythonEnvironmentManager(
                     dependencyInfo,
                     
getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories(),
-                    new HashMap<>(System.getenv()));
+                    new HashMap<>(System.getenv()),
+                    getRuntimeContext().getJobId());
         } else {
             throw new UnsupportedOperationException(
                     String.format(
diff --git 
a/flink-python/src/test/java/org/apache/flink/python/env/beam/ProcessPythonEnvironmentManagerTest.java
 
b/flink-python/src/test/java/org/apache/flink/python/env/beam/ProcessPythonEnvironmentManagerTest.java
index 5454a5a..b1a16be 100644
--- 
a/flink-python/src/test/java/org/apache/flink/python/env/beam/ProcessPythonEnvironmentManagerTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/python/env/beam/ProcessPythonEnvironmentManagerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.python.env.beam;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.python.env.PythonDependencyInfo;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.OperatingSystem;
@@ -41,6 +42,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 
 import static 
org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.PYFLINK_GATEWAY_DISABLED;
 import static 
org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.PYTHON_ARCHIVES_DIR;
@@ -172,10 +174,9 @@ public class ProcessPythonEnvironmentManagerTest {
         try (ProcessPythonEnvironmentManager environmentManager =
                 createBasicPythonEnvironmentManager(dependencyInfo)) {
             environmentManager.open();
-            Map<String, String> environmentVariable =
-                    environmentManager.constructEnvironmentVariables();
-
             String baseDir = environmentManager.getBaseDirectory();
+            Map<String, String> environmentVariable = 
environmentManager.getPythonEnv();
+
             String[] expectedUserPythonPaths =
                     new String[] {
                         String.join(File.separator, baseDir, PYTHON_FILES_DIR, 
"zip0", "test_zip"),
@@ -242,12 +243,21 @@ public class ProcessPythonEnvironmentManagerTest {
 
         try (ProcessPythonEnvironmentManager environmentManager =
                 createBasicPythonEnvironmentManager(dependencyInfo)) {
-            environmentManager.open();
+            File baseDirectory = new File(tmpDir, "python-dist-" + 
UUID.randomUUID().toString());
+            if (!baseDirectory.mkdirs()) {
+                throw new IOException(
+                        "Could not find a unique directory name in '"
+                                + tmpDir
+                                + "' for storing the generated files of python 
dependency.");
+            }
+            String tmpBase = baseDirectory.getAbsolutePath();
             Map<String, String> environmentVariable =
-                    environmentManager.constructEnvironmentVariables();
+                    environmentManager.constructEnvironmentVariables(tmpBase);
 
-            String tmpBase = environmentManager.getBaseDirectory();
-            Map<String, String> expected = 
getBasicExpectedEnv(environmentManager);
+            Map<String, String> expected = new HashMap<>();
+            expected.put("python", "python");
+            expected.put("BOOT_LOG_DIR", tmpBase);
+            expected.put(PYFLINK_GATEWAY_DISABLED, "true");
             expected.put(PYTHON_REQUIREMENTS_FILE, String.join(File.separator, 
tmpDir, "file0"));
             expected.put(PYTHON_REQUIREMENTS_CACHE, 
String.join(File.separator, tmpDir, "dir0"));
             expected.put(
@@ -269,10 +279,9 @@ public class ProcessPythonEnvironmentManagerTest {
         try (ProcessPythonEnvironmentManager environmentManager =
                 createBasicPythonEnvironmentManager(dependencyInfo)) {
             environmentManager.open();
-            Map<String, String> environmentVariable =
-                    environmentManager.constructEnvironmentVariables();
-
             String tmpBase = environmentManager.getBaseDirectory();
+            Map<String, String> environmentVariable = 
environmentManager.getPythonEnv();
+
             Map<String, String> expected = 
getBasicExpectedEnv(environmentManager);
             expected.put(
                     PYTHON_WORKING_DIR, String.join(File.separator, tmpBase, 
PYTHON_ARCHIVES_DIR));
@@ -297,8 +306,7 @@ public class ProcessPythonEnvironmentManagerTest {
         try (ProcessPythonEnvironmentManager environmentManager =
                 createBasicPythonEnvironmentManager(dependencyInfo)) {
             environmentManager.open();
-            Map<String, String> environmentVariable =
-                    environmentManager.constructEnvironmentVariables();
+            Map<String, String> environmentVariable = 
environmentManager.getPythonEnv();
 
             Map<String, String> expected = 
getBasicExpectedEnv(environmentManager);
             expected.put("python", "/usr/local/bin/python");
@@ -315,7 +323,7 @@ public class ProcessPythonEnvironmentManagerTest {
 
         try (ProcessPythonEnvironmentManager environmentManager =
                 new ProcessPythonEnvironmentManager(
-                        dependencyInfo, new String[] {tmpDir}, sysEnv)) {
+                        dependencyInfo, new String[] {tmpDir}, sysEnv, new 
JobID())) {
             environmentManager.open();
             String retrievalToken = environmentManager.createRetrievalToken();
 
@@ -335,9 +343,11 @@ public class ProcessPythonEnvironmentManagerTest {
 
         try (ProcessPythonEnvironmentManager environmentManager =
                 new ProcessPythonEnvironmentManager(
-                        dependencyInfo, new String[] {tmpDir}, new 
HashMap<>())) {
+                        dependencyInfo, new String[] {tmpDir}, new 
HashMap<>(), new JobID())) {
             environmentManager.open();
-            Map<String, String> env = 
environmentManager.constructEnvironmentVariables();
+            Map<String, String> env =
+                    environmentManager.constructEnvironmentVariables(
+                            environmentManager.getBaseDirectory());
             Map<String, String> expected = 
getBasicExpectedEnv(environmentManager);
             expected.put("BOOT_LOG_DIR", 
environmentManager.getBaseDirectory());
             assertEquals(expected, env);
@@ -414,6 +424,6 @@ public class ProcessPythonEnvironmentManagerTest {
     private static ProcessPythonEnvironmentManager 
createBasicPythonEnvironmentManager(
             PythonDependencyInfo dependencyInfo) {
         return new ProcessPythonEnvironmentManager(
-                dependencyInfo, new String[] {tmpDir}, new HashMap<>());
+                dependencyInfo, new String[] {tmpDir}, new HashMap<>(), new 
JobID());
     }
 }
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PythonTestUtils.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PythonTestUtils.java
index 0f93c85..82ac09b 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PythonTestUtils.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PythonTestUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.runtime.utils;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.python.env.PythonDependencyInfo;
 import org.apache.flink.python.env.PythonEnvironmentManager;
 import org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager;
@@ -91,6 +92,7 @@ public final class PythonTestUtils {
         return new ProcessPythonEnvironmentManager(
                 new PythonDependencyInfo(new HashMap<>(), null, null, new 
HashMap<>(), "python"),
                 new String[] {System.getProperty("java.io.tmpdir")},
-                env);
+                env,
+                new JobID());
     }
 }

Reply via email to