This is an automated email from the ASF dual-hosted git repository.
sxnan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new 68bc4c8 [hotfix] Correcting improper uses of the Pemja (#275)
68bc4c8 is described below
commit 68bc4c801d95410a0b31c821848baa3e12a3d0d5
Author: Eugene <[email protected]>
AuthorDate: Fri Oct 17 16:00:28 2025 +0800
[hotfix] Correcting improper uses of the Pemja (#275)
---
.../runtime/python/utils/PythonActionExecutor.java | 35 ++++++++--------------
1 file changed, 12 insertions(+), 23 deletions(-)
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
index 5b04322..014d450 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
@@ -40,16 +40,12 @@ public class PythonActionExecutor {
// =========== RUNNER CONTEXT ===========
private static final String CREATE_FLINK_RUNNER_CONTEXT =
"flink_runner_context.create_flink_runner_context";
- private static final String FLINK_RUNNER_CONTEXT_REF_NAME_PREFIX =
"flink_runner_context_";
- private static final AtomicLong FLINK_RUNNER_CONTEXT_REF_ID = new
AtomicLong(0);
// ========== ASYNC THREAD POOL ===========
private static final String CREATE_ASYNC_THREAD_POOL =
"flink_runner_context.create_async_thread_pool";
private static final String CLOSE_ASYNC_THREAD_POOL =
"flink_runner_context.close_async_thread_pool";
- private static final String PYTHON_ASYNC_THREAD_POOL_REF_NAME =
"python_async_thread_pool";
- private static final AtomicLong PYTHON_ASYNC_THREAD_POOL_REF_ID = new
AtomicLong(0);
// =========== PYTHON GENERATOR ===========
private static final String CALL_PYTHON_GENERATOR =
"function.call_python_generator";
@@ -66,7 +62,7 @@ public class PythonActionExecutor {
private final PythonEnvironmentManager environmentManager;
private final String agentPlanJson;
private PythonInterpreter interpreter;
- private String pythonAsyncThreadPoolObjectName;
+ private Object pythonAsyncThreadPool;
public PythonActionExecutor(PythonEnvironmentManager environmentManager,
String agentPlanJson) {
this.environmentManager = environmentManager;
@@ -80,14 +76,7 @@ public class PythonActionExecutor {
interpreter = env.getInterpreter();
interpreter.exec(PYTHON_IMPORTS);
- // TODO: remove the set and get thread pool after updating pemja to
version 0.5.3. For more
- // details, please refer to
- // https://github.com/apache/flink-agents/issues/83.
- Object pythonAsyncThreadPool =
interpreter.invoke(CREATE_ASYNC_THREAD_POOL);
- this.pythonAsyncThreadPoolObjectName =
- PYTHON_ASYNC_THREAD_POOL_REF_NAME
- + PYTHON_ASYNC_THREAD_POOL_REF_ID.incrementAndGet();
- interpreter.set(pythonAsyncThreadPoolObjectName,
pythonAsyncThreadPool);
+ pythonAsyncThreadPool = interpreter.invoke(CREATE_ASYNC_THREAD_POOL);
}
/**
@@ -106,18 +95,12 @@ public class PythonActionExecutor {
runnerContext.checkNoPendingEvents();
function.setInterpreter(interpreter);
- // TODO: remove the set and get runner context after updating pemja to
version 0.5.3. For
- // more details, please refer to
https://github.com/apache/flink-agents/issues/83.
Object pythonRunnerContextObject =
interpreter.invoke(
CREATE_FLINK_RUNNER_CONTEXT,
runnerContext,
agentPlanJson,
- interpreter.get(pythonAsyncThreadPoolObjectName));
- String pythonRunnerContextObjectName =
- FLINK_RUNNER_CONTEXT_REF_NAME_PREFIX
- + FLINK_RUNNER_CONTEXT_REF_ID.incrementAndGet();
- interpreter.set(pythonRunnerContextObjectName,
pythonRunnerContextObject);
+ pythonAsyncThreadPool);
Object pythonEventObject =
interpreter.invoke(CONVERT_TO_PYTHON_OBJECT, event.getEvent());
@@ -169,9 +152,15 @@ public class PythonActionExecutor {
}
public void close() throws Exception {
- if (pythonAsyncThreadPoolObjectName != null) {
- interpreter.invoke(
- CLOSE_ASYNC_THREAD_POOL,
interpreter.get(pythonAsyncThreadPoolObjectName));
+ if (interpreter != null) {
+ if (pythonAsyncThreadPool != null) {
+ interpreter.invoke(CLOSE_ASYNC_THREAD_POOL,
pythonAsyncThreadPool);
+ }
+ interpreter.close();
+ }
+
+ if (environmentManager != null) {
+ environmentManager.close();
}
}