codenohup commented on code in PR #80: URL: https://github.com/apache/flink-agents/pull/80#discussion_r2240270394
########## runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java: ########## @@ -39,62 +37,103 @@ public class PythonActionExecutor { "from flink_agents.plan import function\n" + "from flink_agents.runtime import flink_runner_context\n" + "from flink_agents.runtime import python_java_utils"; + + // =========== RUNNER CONTEXT =========== private static final String CREATE_FLINK_RUNNER_CONTEXT = "flink_runner_context.create_flink_runner_context"; + private static final String FLINK_RUNNER_CONTEXT_VAR_NAME_PREFIX = "flink_runner_context_"; + private static final AtomicLong FLINK_RUNNER_CONTEXT_VAR_ID = new AtomicLong(0); + + // ========== ASYNC THREAD POOL =========== + private static final String CREATE_ASYNC_THREAD_POOL = + "flink_runner_context.create_async_thread_pool"; + private static final String PYTHON_ASYNC_THREAD_POOL_VAR_NAME_PREFIX = + "python_async_thread_pool"; + private static final AtomicLong PYTHON_ASYNC_THREAD_POOL_VAR_ID = new AtomicLong(0); + + // =========== PYTHON GENERATOR =========== + private static final String CALL_PYTHON_GENERATOR = "function.call_python_generator"; + private static final String PYTHON_GENERATOR_VAR_NAME_PREFIX = "python_generator_"; + private static final AtomicLong PYTHON_GENERATOR_VAR_ID = new AtomicLong(0); + + // =========== PYTHON AND JAVA OBJECT CONVERT =========== private static final String CONVERT_TO_PYTHON_OBJECT = "python_java_utils.convert_to_python_object"; private static final String WRAP_TO_INPUT_EVENT = "python_java_utils.wrap_to_input_event"; private static final String GET_OUTPUT_FROM_OUTPUT_EVENT = "python_java_utils.get_output_from_output_event"; - private static final String FLINK_RUNNER_CONTEXT_VAR_NAME = "flink_runner_context"; private final PythonEnvironmentManager environmentManager; private final String agentPlanJson; - private PythonRunnerContextImpl runnerContext; - private PythonInterpreter interpreter; + private String pythonAsyncThreadPoolObjectName; public PythonActionExecutor(PythonEnvironmentManager environmentManager, String agentPlanJson) { this.environmentManager = environmentManager; this.agentPlanJson = agentPlanJson; } - public void open( - MapState<String, MemoryObjectImpl.MemoryItem> shortTermMemState, - Runnable mailboxThreadChecker) - throws Exception { + public void open() throws Exception { environmentManager.open(); EmbeddedPythonEnvironment env = environmentManager.createEnvironment(); interpreter = env.getInterpreter(); interpreter.exec(PYTHON_IMPORTS); - runnerContext = new PythonRunnerContextImpl(shortTermMemState, mailboxThreadChecker); - - // TODO: remove the set and get runner context after updating pemja to version 0.5.3 - Object pythonRunnerContextObject = - interpreter.invoke(CREATE_FLINK_RUNNER_CONTEXT, runnerContext, agentPlanJson); - interpreter.set(FLINK_RUNNER_CONTEXT_VAR_NAME, pythonRunnerContextObject); + // TODO: remove the set and get thread pool after updating pemja to version 0.5.3 + Object pythonAsyncThreadPool = interpreter.invoke(CREATE_ASYNC_THREAD_POOL); + this.pythonAsyncThreadPoolObjectName = + PYTHON_ASYNC_THREAD_POOL_VAR_NAME_PREFIX + + PYTHON_ASYNC_THREAD_POOL_VAR_ID.incrementAndGet(); + interpreter.set(pythonAsyncThreadPoolObjectName, pythonAsyncThreadPool); } - public List<Event> executePythonFunction(PythonFunction function, PythonEvent event) + /** + * Execute the Python function, which may return a Python generator that needs to be processed + * in the future. Due to an issue in Pemja regarding incorrect object reference counting, this + * may lead to garbage collection of the object. To prevent this, we use the set and get methods + * to manually increment the object's reference count, then return the name of the Python + * generator variable. + * + * @return The name of the Python generator variable. It may be null if the Python function does + * not return a generator. + */ + public String executePythonFunction( + PythonFunction function, PythonEvent event, RunnerContextImpl runnerContext) throws Exception { runnerContext.checkNoPendingEvents(); function.setInterpreter(interpreter); // TODO: remove the set and get runner context after updating pemja to version 0.5.3 - Object pythonRunnerContextObject = interpreter.get(FLINK_RUNNER_CONTEXT_VAR_NAME); + Object pythonRunnerContextObject = + interpreter.invoke( + CREATE_FLINK_RUNNER_CONTEXT, + runnerContext, + agentPlanJson, + interpreter.get(pythonAsyncThreadPoolObjectName)); + String pythonRunnerContextObjectName = + FLINK_RUNNER_CONTEXT_VAR_NAME_PREFIX + + FLINK_RUNNER_CONTEXT_VAR_ID.incrementAndGet(); Review Comment: > Why do we need to create a new object for each time calling the function? Since actions may be executed in parallel, to avoid interference between different action instances, we create a new instance for each run of an action. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org