This is an automated email from the ASF dual-hosted git repository.
hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new fb5d1b8 [FLINK-24123][python] Optimize the python operator instances
of the same job to share one python environment resources in the same jvm
fb5d1b8 is described below
commit fb5d1b84a8faeada0d745204381b519ac689a348
Author: huangxingbo <[email protected]>
AuthorDate: Tue Sep 7 17:24:15 2021 +0800
[FLINK-24123][python] Optimize the python operator instances of the same
job to share one python environment resources in the same jvm
This closes #17180.
---
docs/content.zh/docs/dev/python/debugging.md | 2 -
docs/content/docs/dev/python/debugging.md | 2 -
flink-python/dev/integration_test.sh | 3 +-
.../datastream/stream_execution_environment.py | 19 +-
.../fn_execution/beam/beam_worker_pool_service.py | 41 +++-
flink-python/pyflink/table/table_environment.py | 22 +-
.../pyflink/table/tests/test_dependency.py | 1 -
.../env/beam/ProcessPythonEnvironmentManager.java | 255 +++++++++++++++------
.../python/AbstractPythonFunctionOperator.java | 3 +-
.../beam/ProcessPythonEnvironmentManagerTest.java | 42 ++--
.../flink/table/runtime/utils/PythonTestUtils.java | 4 +-
11 files changed, 261 insertions(+), 133 deletions(-)
diff --git a/docs/content.zh/docs/dev/python/debugging.md
b/docs/content.zh/docs/dev/python/debugging.md
index b7a5d99..2d54424 100644
--- a/docs/content.zh/docs/dev/python/debugging.md
+++ b/docs/content.zh/docs/dev/python/debugging.md
@@ -85,8 +85,6 @@ $ python -c "import pyflink;import
os;print(os.path.dirname(os.path.abspath(pyfl
你可以直接在 PyCharm 等 IDE 调试你的 Python 函数。
-**注意:** 当前,如果你使用了配置 `python-archives`,并且作业的并发度是大于`1`的,只能够使用[远程调试](#远程调试)的方式。
-
### 远程调试
你可以利用PyCharm提供的[`pydevd_pycharm`](https://pypi.org/project/pydevd-pycharm/)工具进行Python
UDF的调试
diff --git a/docs/content/docs/dev/python/debugging.md
b/docs/content/docs/dev/python/debugging.md
index c6963ef..cd58365 100644
--- a/docs/content/docs/dev/python/debugging.md
+++ b/docs/content/docs/dev/python/debugging.md
@@ -89,8 +89,6 @@ $ python -c "import pyflink;import
os;print(os.path.dirname(os.path.abspath(pyfl
You can debug your python functions directly in IDEs such as PyCharm.
-**Note:** Currently, if you use `python-archives` in the job and the
parallelism of the job is greater than `1`, you can only use [remote
debug](#remote-debug) mode.
-
### Remote Debug
You can make use of the
[`pydevd_pycharm`](https://pypi.org/project/pydevd-pycharm/) tool of PyCharm to
debug Python UDFs.
diff --git a/flink-python/dev/integration_test.sh
b/flink-python/dev/integration_test.sh
index 36c0cae..c358632 100755
--- a/flink-python/dev/integration_test.sh
+++ b/flink-python/dev/integration_test.sh
@@ -37,8 +37,7 @@ FLINK_PYTHON_DIR=$(dirname "$CURRENT_DIR")
test_module "common"
# test datastream module
-test_module "datastream" "--ignore
$FLINK_PYTHON_DIR/pyflink/datastream/tests/test_stream_execution_environment.py"
-pytest
"$FLINK_PYTHON_DIR/pyflink/datastream/tests/test_stream_execution_environment.py"
+test_module "datastream"
# test fn_execution module
test_module "fn_execution"
diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py
b/flink-python/pyflink/datastream/stream_execution_environment.py
index 4f6a4ee..3dbd3e8 100644
--- a/flink-python/pyflink/datastream/stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/stream_execution_environment.py
@@ -892,21 +892,12 @@ class StreamExecutionEnvironment(object):
j_configuration =
get_j_env_configuration(self._j_stream_execution_environment)
def startup_loopback_server():
+ from pyflink.fn_execution.beam.beam_worker_pool_service import \
+ BeamFnLoopbackWorkerPoolServicer
jvm = gateway.jvm
- env_config = JPythonConfigUtil.getEnvironmentConfig(
- self._j_stream_execution_environment)
- parallelism = self.get_parallelism()
- if parallelism > 1 and
env_config.containsKey(jvm.PythonOptions.PYTHON_ARCHIVES.key()):
- import logging
- logging.warning("Loopback mode is disabled as python archives
are used and the "
- "parallelism of the job is greater than 1. The
Python user-defined "
- "functions will be executed in an independent
Python process.")
- else:
- from pyflink.fn_execution.beam.beam_worker_pool_service import
\
- BeamFnLoopbackWorkerPoolServicer
- j_env = jvm.System.getenv()
- get_field_value(j_env, "m").put(
- 'PYFLINK_LOOPBACK_SERVER_ADDRESS',
BeamFnLoopbackWorkerPoolServicer().start())
+ j_env = jvm.System.getenv()
+ get_field_value(j_env, "m").put(
+ 'PYFLINK_LOOPBACK_SERVER_ADDRESS',
BeamFnLoopbackWorkerPoolServicer().start())
python_worker_execution_mode = None
if hasattr(self, "_python_worker_execution_mode"):
diff --git a/flink-python/pyflink/fn_execution/beam/beam_worker_pool_service.py
b/flink-python/pyflink/fn_execution/beam/beam_worker_pool_service.py
index 2d61a55..3d0e868 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_worker_pool_service.py
+++ b/flink-python/pyflink/fn_execution/beam/beam_worker_pool_service.py
@@ -62,6 +62,9 @@ class
BeamFnLoopbackWorkerPoolServicer(beam_fn_api_pb2_grpc.BeamFnExternalWorker
def __init__(self):
self._parse_param_lock = threading.Lock()
self._worker_address = None
+ self._old_working_dir = None
+ self._old_python_path = None
+ self._ref_cnt = 0
def start(self):
if not self._worker_address:
@@ -91,17 +94,20 @@ class
BeamFnLoopbackWorkerPoolServicer(beam_fn_api_pb2_grpc.BeamFnExternalWorker
def _start_sdk_worker_main(self, start_worker_request:
beam_fn_api_pb2.StartWorkerRequest):
params = start_worker_request.params
- base_dir = None
self._parse_param_lock.acquire()
- if 'PYTHONPATH' in params:
- python_path_list = params['PYTHONPATH'].split(':')
- python_path_list.reverse()
- for path in python_path_list:
- sys.path.insert(0, path)
- if '_PYTHON_WORKING_DIR' in params:
- base_dir = os.getcwd()
- os.chdir(params['_PYTHON_WORKING_DIR'])
- os.environ.update(params)
+ # The first thread to start is responsible for preparing all execution
environment.
+ if not self._ref_cnt:
+ if 'PYTHONPATH' in params:
+ self._old_python_path = sys.path[:]
+ python_path_list = params['PYTHONPATH'].split(':')
+ python_path_list.reverse()
+ for path in python_path_list:
+ sys.path.insert(0, path)
+ if '_PYTHON_WORKING_DIR' in params:
+ self._old_working_dir = os.getcwd()
+ os.chdir(params['_PYTHON_WORKING_DIR'])
+ os.environ.update(params)
+ self._ref_cnt += 1
self._parse_param_lock.release()
# read job information from provision stub
@@ -151,7 +157,18 @@ class
BeamFnLoopbackWorkerPoolServicer(beam_fn_api_pb2_grpc.BeamFnExternalWorker
_LOGGER.exception('Python sdk harness failed: ')
raise
finally:
- if base_dir:
- os.chdir(base_dir)
+ self._parse_param_lock.acquire()
+ self._ref_cnt -= 1
+ # The last thread to exit is responsible for reverting working
directory and sys.path.
+ if self._ref_cnt == 0:
+ if self._old_python_path is not None:
+ sys.path.clear()
+ for item in self._old_python_path:
+ sys.path.append(item)
+ self._old_python_path = None
+ if self._old_working_dir is not None:
+ os.chdir(self._old_working_dir)
+ self._old_working_dir = None
+ self._parse_param_lock.release()
if fn_log_handler:
fn_log_handler.close()
diff --git a/flink-python/pyflink/table/table_environment.py
b/flink-python/pyflink/table/table_environment.py
index 9db27be..1e10a0b 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -1749,24 +1749,12 @@ class TableEnvironment(object):
# start BeamFnLoopbackWorkerPoolServicer when executed in MiniCluster
def startup_loopback_server():
- from pyflink.common import Configuration
- _j_config =
jvm.org.apache.flink.python.util.PythonConfigUtil.getMergedConfig(
- self._get_j_env(), self.get_config()._j_table_config)
- config = Configuration(j_configuration=_j_config)
- parallelism = int(config.get_string("parallelism.default", "1"))
-
- if parallelism > 1 and
config.contains_key(jvm.PythonOptions.PYTHON_ARCHIVES.key()):
- import logging
- logging.warning("Lookback mode is disabled as python archives
are used and the "
- "parallelism of the job is greater than 1. The
Python user-defined "
- "functions will be executed in an independent
Python process.")
- else:
- from pyflink.fn_execution.beam.beam_worker_pool_service import
\
- BeamFnLoopbackWorkerPoolServicer
+ from pyflink.fn_execution.beam.beam_worker_pool_service import \
+ BeamFnLoopbackWorkerPoolServicer
- j_env = jvm.System.getenv()
- get_field_value(j_env, "m").put(
- 'PYFLINK_LOOPBACK_SERVER_ADDRESS',
BeamFnLoopbackWorkerPoolServicer().start())
+ j_env = jvm.System.getenv()
+ get_field_value(j_env, "m").put(
+ 'PYFLINK_LOOPBACK_SERVER_ADDRESS',
BeamFnLoopbackWorkerPoolServicer().start())
python_worker_execution_mode = None
if hasattr(self, "_python_worker_execution_mode"):
diff --git a/flink-python/pyflink/table/tests/test_dependency.py
b/flink-python/pyflink/table/tests/test_dependency.py
index 27ebf6b..5a8e7ab 100644
--- a/flink-python/pyflink/table/tests/test_dependency.py
+++ b/flink-python/pyflink/table/tests/test_dependency.py
@@ -160,7 +160,6 @@ class StreamDependencyTests(DependencyTests,
PyFlinkStreamTableTestCase):
with open("data/data.txt", 'r') as f:
return i + int(f.read())
-
self.t_env.get_config().get_configuration().set_string("parallelism.default",
"1")
self.t_env.create_temporary_system_function("add_from_file",
udf(add_from_file,
DataTypes.BIGINT(),
DataTypes.BIGINT()))
diff --git
a/flink-python/src/main/java/org/apache/flink/python/env/beam/ProcessPythonEnvironmentManager.java
b/flink-python/src/main/java/org/apache/flink/python/env/beam/ProcessPythonEnvironmentManager.java
index 41e29da..ab8413a 100644
---
a/flink-python/src/main/java/org/apache/flink/python/env/beam/ProcessPythonEnvironmentManager.java
+++
b/flink-python/src/main/java/org/apache/flink/python/env/beam/ProcessPythonEnvironmentManager.java
@@ -20,6 +20,8 @@ package org.apache.flink.python.env.beam;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.python.env.ProcessPythonEnvironment;
import org.apache.flink.python.env.PythonDependencyInfo;
import org.apache.flink.python.env.PythonEnvironment;
@@ -27,7 +29,9 @@ import org.apache.flink.python.env.PythonEnvironmentManager;
import org.apache.flink.python.util.CompressionUtils;
import org.apache.flink.python.util.PythonEnvironmentManagerUtils;
import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
+import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.shaded.guava30.com.google.common.base.Strings;
@@ -35,6 +39,8 @@ import org.codehaus.commons.nullanalysis.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.concurrent.GuardedBy;
+
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
@@ -51,6 +57,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.UUID;
+import java.util.concurrent.locks.ReentrantLock;
import static
org.apache.flink.python.util.PythonDependencyUtils.PARAM_DELIMITER;
@@ -85,43 +92,44 @@ public final class ProcessPythonEnvironmentManager
implements PythonEnvironmentM
private static final long CHECK_INTERVAL = 20;
private static final long CHECK_TIMEOUT = 1000;
- private transient String baseDirectory;
-
- /** Directory for storing the installation result of the requirements
file. */
- private transient String requirementsDirectory;
-
- /** Directory for storing the extracted result of the archive files. */
- private transient String archivesDirectory;
-
- /** Directory for storing the uploaded python files. */
- private transient String filesDirectory;
-
private transient Thread shutdownHook;
+ private transient PythonEnvResources.PythonLeasedResource resource;
+
@NotNull private final PythonDependencyInfo dependencyInfo;
@NotNull private final Map<String, String> systemEnv;
@NotNull private final String[] tmpDirectories;
+ @NotNull private final JobID jobID;
public ProcessPythonEnvironmentManager(
@NotNull PythonDependencyInfo dependencyInfo,
@NotNull String[] tmpDirectories,
- @NotNull Map<String, String> systemEnv) {
+ @NotNull Map<String, String> systemEnv,
+ @NotNull JobID jobID) {
this.dependencyInfo = Objects.requireNonNull(dependencyInfo);
this.tmpDirectories = Objects.requireNonNull(tmpDirectories);
this.systemEnv = Objects.requireNonNull(systemEnv);
+ this.jobID = Objects.requireNonNull(jobID);
}
@Override
public void open() throws Exception {
- baseDirectory = createBaseDirectory(tmpDirectories);
- archivesDirectory = String.join(File.separator, baseDirectory,
PYTHON_ARCHIVES_DIR);
- requirementsDirectory = String.join(File.separator, baseDirectory,
PYTHON_REQUIREMENTS_DIR);
- filesDirectory = String.join(File.separator, baseDirectory,
PYTHON_FILES_DIR);
-
- File baseDirectoryFile = new File(baseDirectory);
- if (!baseDirectoryFile.exists() && !baseDirectoryFile.mkdir()) {
- throw new IOException("Could not create the base directory: " +
baseDirectory);
- }
+ resource =
+ PythonEnvResources.getOrAllocateSharedResource(
+ jobID,
+ jobID -> {
+ String baseDirectory =
createBaseDirectory(tmpDirectories);
+
+ File baseDirectoryFile = new File(baseDirectory);
+ if (!baseDirectoryFile.exists() &&
!baseDirectoryFile.mkdir()) {
+ throw new IOException(
+ "Could not create the base directory:
" + baseDirectory);
+ }
+
+ Map<String, String> env =
constructEnvironmentVariables(baseDirectory);
+ installRequirements(baseDirectory, env);
+ return Tuple2.of(baseDirectory, env);
+ });
shutdownHook =
ShutdownHookUtil.addShutdownHook(
this,
ProcessPythonEnvironmentManager.class.getSimpleName(), LOG);
@@ -130,29 +138,7 @@ public final class ProcessPythonEnvironmentManager
implements PythonEnvironmentM
@Override
public void close() throws Exception {
try {
- int retries = 0;
- while (true) {
- try {
- FileUtils.deleteDirectory(new File(baseDirectory));
- break;
- } catch (Throwable t) {
- retries++;
- if (retries <= CHECK_TIMEOUT / CHECK_INTERVAL) {
- LOG.warn(
- String.format(
- "Failed to delete the working
directory %s of the Python UDF worker. Retrying...",
- baseDirectory),
- t);
- } else {
- LOG.warn(
- String.format(
- "Failed to delete the working
directory %s of the Python UDF worker.",
- baseDirectory),
- t);
- break;
- }
- }
- }
+ PythonEnvResources.release(jobID);
} finally {
if (shutdownHook != null) {
ShutdownHookUtil.removeShutdownHook(
@@ -163,18 +149,9 @@ public final class ProcessPythonEnvironmentManager
implements PythonEnvironmentM
}
@Override
- public PythonEnvironment createEnvironment() throws IOException {
- Map<String, String> env = constructEnvironmentVariables();
+ public PythonEnvironment createEnvironment() throws Exception {
+ HashMap<String, String> env = new HashMap<>(resource.env);
- if (dependencyInfo.getRequirementsFilePath().isPresent()) {
- LOG.info("Trying to pip install the Python requirements...");
- PythonEnvironmentManagerUtils.pipInstallRequirements(
- dependencyInfo.getRequirementsFilePath().get(),
- dependencyInfo.getRequirementsCacheDir().orElse(null),
- requirementsDirectory,
- dependencyInfo.getPythonExec(),
- env);
- }
String runnerScript =
PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(
dependencyInfo.getPythonExec(), env);
@@ -192,7 +169,8 @@ public final class ProcessPythonEnvironmentManager
implements PythonEnvironmentM
public String createRetrievalToken() throws IOException {
File retrievalToken =
new File(
- baseDirectory, "retrieval_token_" +
UUID.randomUUID().toString() + ".json");
+ resource.baseDirectory,
+ "retrieval_token_" + UUID.randomUUID().toString() +
".json");
if (retrievalToken.createNewFile()) {
final DataOutputStream dos = new DataOutputStream(new
FileOutputStream(retrievalToken));
dos.writeBytes("{\"manifest\": {}}");
@@ -216,15 +194,15 @@ public final class ProcessPythonEnvironmentManager
implements PythonEnvironmentM
* @return The environment variables which contain the paths of the python
dependencies.
*/
@VisibleForTesting
- Map<String, String> constructEnvironmentVariables()
+ Map<String, String> constructEnvironmentVariables(String baseDirectory)
throws IOException, IllegalArgumentException {
Map<String, String> env = new HashMap<>(this.systemEnv);
- constructFilesDirectory(env);
+ constructFilesDirectory(env, baseDirectory);
- constructArchivesDirectory(env);
+ constructArchivesDirectory(env, baseDirectory);
- constructRequirementsDirectory(env);
+ constructRequirementsDirectory(env, baseDirectory);
// set BOOT_LOG_DIR.
env.put("BOOT_LOG_DIR", baseDirectory);
@@ -243,13 +221,34 @@ public final class ProcessPythonEnvironmentManager
implements PythonEnvironmentM
return env;
}
+ @VisibleForTesting
+ void installRequirements(String baseDirectory, Map<String, String> env)
throws IOException {
+ // Directory for storing the installation result of the requirements
file.
+ String requirementsDirectory =
+ String.join(File.separator, baseDirectory,
PYTHON_REQUIREMENTS_DIR);
+ if (dependencyInfo.getRequirementsFilePath().isPresent()) {
+ LOG.info("Trying to pip install the Python requirements...");
+ PythonEnvironmentManagerUtils.pipInstallRequirements(
+ dependencyInfo.getRequirementsFilePath().get(),
+ dependencyInfo.getRequirementsCacheDir().orElse(null),
+ requirementsDirectory,
+ dependencyInfo.getPythonExec(),
+ env);
+ }
+ }
+
public void setEnvironmentVariable(String key, String value) {
this.systemEnv.put(key, value);
}
- private void constructFilesDirectory(Map<String, String> env) throws
IOException {
+ private void constructFilesDirectory(Map<String, String> env, String
baseDirectory)
+ throws IOException {
// link or copy python files to filesDirectory and add them to
PYTHONPATH
List<String> pythonFilePaths = new ArrayList<>();
+
+ // Directory for storing the uploaded python files.
+ String filesDirectory = String.join(File.separator, baseDirectory,
PYTHON_FILES_DIR);
+
for (Map.Entry<String, String> entry :
dependencyInfo.getPythonFiles().entrySet()) {
// The origin file name will be wiped when downloaded from the
distributed cache,
// restore the origin name to
@@ -318,7 +317,11 @@ public final class ProcessPythonEnvironmentManager
implements PythonEnvironmentM
LOG.info("PYTHONPATH of python worker: {}", env.get("PYTHONPATH"));
}
- private void constructArchivesDirectory(Map<String, String> env) throws
IOException {
+ private void constructArchivesDirectory(Map<String, String> env, String
baseDirectory)
+ throws IOException {
+ // Directory for storing the extracted result of the archive files.
+ String archivesDirectory = String.join(File.separator, baseDirectory,
PYTHON_ARCHIVES_DIR);
+
if (!dependencyInfo.getArchives().isEmpty()) {
// set the archives directory as the working directory, then user
could access the
// content of the archives via relative path
@@ -347,10 +350,13 @@ public final class ProcessPythonEnvironmentManager
implements PythonEnvironmentM
}
}
- private void constructRequirementsDirectory(Map<String, String> env)
throws IOException {
+ private void constructRequirementsDirectory(Map<String, String> env,
String baseDirectory)
+ throws IOException {
// set the requirements file and the dependencies specified by the
requirements file will be
// installed in
// boot.py during initialization
+ String requirementsDirectory =
+ String.join(File.separator, baseDirectory,
PYTHON_REQUIREMENTS_DIR);
if (dependencyInfo.getRequirementsFilePath().isPresent()) {
File requirementsDirectoryFile = new File(requirementsDirectory);
if (!requirementsDirectoryFile.mkdirs()) {
@@ -382,12 +388,18 @@ public final class ProcessPythonEnvironmentManager
implements PythonEnvironmentM
@VisibleForTesting
String getBaseDirectory() {
- return baseDirectory;
+ return resource.baseDirectory;
+ }
+
+ @VisibleForTesting
+ Map<String, String> getPythonEnv() {
+ return resource.env;
}
@Override
public String getBootLog() throws Exception {
- File bootLogFile = new File(baseDirectory + File.separator +
"flink-python-udf-boot.log");
+ File bootLogFile =
+ new File(resource.baseDirectory + File.separator +
"flink-python-udf-boot.log");
String msg = "Failed to create stage bundle factory!";
if (bootLogFile.exists()) {
byte[] output = Files.readAllBytes(bootLogFile.toPath());
@@ -430,4 +442,117 @@ public final class ProcessPythonEnvironmentManager
implements PythonEnvironmentM
+ Arrays.toString(tmpDirectories)
+ "' for storing the generated files of python
dependency.");
}
+
+ private static final class PythonEnvResources {
+
+ private static final ReentrantLock lock = new ReentrantLock();
+
+ @GuardedBy("lock")
+ private static final Map<Object, PythonLeasedResource>
reservedResources = new HashMap<>();
+
+ static PythonLeasedResource getOrAllocateSharedResource(
+ Object type,
+ FunctionWithException<Object, Tuple2<String, Map<String,
String>>, Exception>
+ initializer)
+ throws Exception {
+ try {
+ lock.lockInterruptibly();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted which preparing python
environment.");
+ }
+
+ try {
+ PythonLeasedResource resource = reservedResources.get(type);
+ if (resource == null) {
+ resource = createResource(initializer, type);
+ reservedResources.put(type, resource);
+ }
+ resource.incRef();
+ return resource;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public static void release(JobID jobID) throws Exception {
+ lock.lock();
+ try {
+ final PythonLeasedResource resource =
reservedResources.get(jobID);
+ if (resource == null) {
+ return;
+ }
+ resource.decRef();
+ if (resource.refCount == 0) {
+ reservedResources.remove(jobID);
+ resource.close();
+ }
+
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private static PythonLeasedResource createResource(
+ FunctionWithException<Object, Tuple2<String, Map<String,
String>>, Exception>
+ initializer,
+ Object type)
+ throws Exception {
+ Tuple2<String, Map<String, String>> resource =
initializer.apply(type);
+ String baseDirectory = resource.f0;
+ Map<String, String> env = resource.f1;
+ return new PythonLeasedResource(baseDirectory, env);
+ }
+
+ private static final class PythonLeasedResource implements
AutoCloseable {
+ private final Map<String, String> env;
+
+ /** The base directory of the Python Environment. */
+ private final String baseDirectory;
+
+ /** Keep track of the number of threads sharing this Python
environment resources. */
+ private long refCount = 0;
+
+ PythonLeasedResource(String baseDirectory, Map<String, String>
env) {
+ this.baseDirectory = baseDirectory;
+ this.env = env;
+ }
+
+ void incRef() {
+ this.refCount += 1;
+ }
+
+ void decRef() {
+ Preconditions.checkState(refCount > 0);
+ this.refCount -= 1;
+ }
+
+ @Override
+ public void close() throws Exception {
+ int retries = 0;
+ while (true) {
+ try {
+ FileUtils.deleteDirectory(new File(baseDirectory));
+ break;
+ } catch (Throwable t) {
+ retries++;
+ if (retries <= CHECK_TIMEOUT / CHECK_INTERVAL) {
+ LOG.warn(
+ String.format(
+ "Failed to delete the working
directory %s of the Python UDF worker. Retrying...",
+ baseDirectory),
+ t);
+ } else {
+ LOG.warn(
+ String.format(
+ "Failed to delete the working
directory %s of the Python UDF worker.",
+ baseDirectory),
+ t);
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
}
diff --git
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
index 7b5ab9a..d321370 100644
---
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
+++
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
@@ -381,7 +381,8 @@ public abstract class AbstractPythonFunctionOperator<OUT>
extends AbstractStream
return new ProcessPythonEnvironmentManager(
dependencyInfo,
getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories(),
- new HashMap<>(System.getenv()));
+ new HashMap<>(System.getenv()),
+ getRuntimeContext().getJobId());
} else {
throw new UnsupportedOperationException(
String.format(
diff --git
a/flink-python/src/test/java/org/apache/flink/python/env/beam/ProcessPythonEnvironmentManagerTest.java
b/flink-python/src/test/java/org/apache/flink/python/env/beam/ProcessPythonEnvironmentManagerTest.java
index 5454a5a..b1a16be 100644
---
a/flink-python/src/test/java/org/apache/flink/python/env/beam/ProcessPythonEnvironmentManagerTest.java
+++
b/flink-python/src/test/java/org/apache/flink/python/env/beam/ProcessPythonEnvironmentManagerTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.python.env.beam;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.python.env.PythonDependencyInfo;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.OperatingSystem;
@@ -41,6 +42,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import static
org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.PYFLINK_GATEWAY_DISABLED;
import static
org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.PYTHON_ARCHIVES_DIR;
@@ -172,10 +174,9 @@ public class ProcessPythonEnvironmentManagerTest {
try (ProcessPythonEnvironmentManager environmentManager =
createBasicPythonEnvironmentManager(dependencyInfo)) {
environmentManager.open();
- Map<String, String> environmentVariable =
- environmentManager.constructEnvironmentVariables();
-
String baseDir = environmentManager.getBaseDirectory();
+ Map<String, String> environmentVariable =
environmentManager.getPythonEnv();
+
String[] expectedUserPythonPaths =
new String[] {
String.join(File.separator, baseDir, PYTHON_FILES_DIR,
"zip0", "test_zip"),
@@ -242,12 +243,21 @@ public class ProcessPythonEnvironmentManagerTest {
try (ProcessPythonEnvironmentManager environmentManager =
createBasicPythonEnvironmentManager(dependencyInfo)) {
- environmentManager.open();
+ File baseDirectory = new File(tmpDir, "python-dist-" +
UUID.randomUUID().toString());
+ if (!baseDirectory.mkdirs()) {
+ throw new IOException(
+ "Could not find a unique directory name in '"
+ + tmpDir
+ + "' for storing the generated files of python
dependency.");
+ }
+ String tmpBase = baseDirectory.getAbsolutePath();
Map<String, String> environmentVariable =
- environmentManager.constructEnvironmentVariables();
+ environmentManager.constructEnvironmentVariables(tmpBase);
- String tmpBase = environmentManager.getBaseDirectory();
- Map<String, String> expected =
getBasicExpectedEnv(environmentManager);
+ Map<String, String> expected = new HashMap<>();
+ expected.put("python", "python");
+ expected.put("BOOT_LOG_DIR", tmpBase);
+ expected.put(PYFLINK_GATEWAY_DISABLED, "true");
expected.put(PYTHON_REQUIREMENTS_FILE, String.join(File.separator,
tmpDir, "file0"));
expected.put(PYTHON_REQUIREMENTS_CACHE,
String.join(File.separator, tmpDir, "dir0"));
expected.put(
@@ -269,10 +279,9 @@ public class ProcessPythonEnvironmentManagerTest {
try (ProcessPythonEnvironmentManager environmentManager =
createBasicPythonEnvironmentManager(dependencyInfo)) {
environmentManager.open();
- Map<String, String> environmentVariable =
- environmentManager.constructEnvironmentVariables();
-
String tmpBase = environmentManager.getBaseDirectory();
+ Map<String, String> environmentVariable =
environmentManager.getPythonEnv();
+
Map<String, String> expected =
getBasicExpectedEnv(environmentManager);
expected.put(
PYTHON_WORKING_DIR, String.join(File.separator, tmpBase,
PYTHON_ARCHIVES_DIR));
@@ -297,8 +306,7 @@ public class ProcessPythonEnvironmentManagerTest {
try (ProcessPythonEnvironmentManager environmentManager =
createBasicPythonEnvironmentManager(dependencyInfo)) {
environmentManager.open();
- Map<String, String> environmentVariable =
- environmentManager.constructEnvironmentVariables();
+ Map<String, String> environmentVariable =
environmentManager.getPythonEnv();
Map<String, String> expected =
getBasicExpectedEnv(environmentManager);
expected.put("python", "/usr/local/bin/python");
@@ -315,7 +323,7 @@ public class ProcessPythonEnvironmentManagerTest {
try (ProcessPythonEnvironmentManager environmentManager =
new ProcessPythonEnvironmentManager(
- dependencyInfo, new String[] {tmpDir}, sysEnv)) {
+ dependencyInfo, new String[] {tmpDir}, sysEnv, new
JobID())) {
environmentManager.open();
String retrievalToken = environmentManager.createRetrievalToken();
@@ -335,9 +343,11 @@ public class ProcessPythonEnvironmentManagerTest {
try (ProcessPythonEnvironmentManager environmentManager =
new ProcessPythonEnvironmentManager(
- dependencyInfo, new String[] {tmpDir}, new
HashMap<>())) {
+ dependencyInfo, new String[] {tmpDir}, new
HashMap<>(), new JobID())) {
environmentManager.open();
- Map<String, String> env =
environmentManager.constructEnvironmentVariables();
+ Map<String, String> env =
+ environmentManager.constructEnvironmentVariables(
+ environmentManager.getBaseDirectory());
Map<String, String> expected =
getBasicExpectedEnv(environmentManager);
expected.put("BOOT_LOG_DIR",
environmentManager.getBaseDirectory());
assertEquals(expected, env);
@@ -414,6 +424,6 @@ public class ProcessPythonEnvironmentManagerTest {
private static ProcessPythonEnvironmentManager
createBasicPythonEnvironmentManager(
PythonDependencyInfo dependencyInfo) {
return new ProcessPythonEnvironmentManager(
- dependencyInfo, new String[] {tmpDir}, new HashMap<>());
+ dependencyInfo, new String[] {tmpDir}, new HashMap<>(), new
JobID());
}
}
diff --git
a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PythonTestUtils.java
b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PythonTestUtils.java
index 0f93c85..82ac09b 100644
---
a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PythonTestUtils.java
+++
b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PythonTestUtils.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.runtime.utils;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.python.env.PythonDependencyInfo;
import org.apache.flink.python.env.PythonEnvironmentManager;
import org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager;
@@ -91,6 +92,7 @@ public final class PythonTestUtils {
return new ProcessPythonEnvironmentManager(
new PythonDependencyInfo(new HashMap<>(), null, null, new
HashMap<>(), "python"),
new String[] {System.getProperty("java.io.tmpdir")},
- env);
+ env,
+ new JobID());
}
}