xintongsong commented on code in PR #80:
URL: https://github.com/apache/flink-agents/pull/80#discussion_r2238354910


##########
python/flink_agents/runtime/local_runner.py:
##########
@@ -115,6 +115,23 @@ def get_short_term_memory(self) -> MemoryObject:
         """
         return self._short_term_memory
 
+    def execute_async(
+        self,
+        func: Callable[[Any], Any],
+        *args: Tuple[Any, ...],
+        **kwargs: Dict[str, Any],
+    ) -> Any:
+        """Asynchronously execute the provided function. Access to memory
+         is prohibited within the function.
+        """
+        logger.info(
+            "Local runner does not support asynchronous execution; falling 
back to synchronous execution."
+        )
+        func_result = func(*args, **kwargs)
+        yield func_result

Review Comment:
   Why do we need this yeild here?



##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java:
##########
@@ -240,6 +308,90 @@ private List<Action> getActionsTriggeredBy(Event event) {
         }
     }
 
+    private <T> T pollFromListState(ListState<T> listState) throws Exception {
+        Iterator<T> listStateIterator = listState.get().iterator();
+        if (!listStateIterator.hasNext()) {
+            return null;
+        }
+
+        T polled = listStateIterator.next();
+        List<T> remaining = new ArrayList<>();
+        while (listStateIterator.hasNext()) {
+            remaining.add(listStateIterator.next());
+        }
+        listState.clear();
+        listState.update(remaining);
+        return polled;
+    }
+
+    private ActionTask createActionTask(Object key, Action action, Event 
event) {
+        if (action.getExec() instanceof JavaFunction) {
+            return new JavaActionTask(
+                    key,
+                    event,
+                    action,
+                    new RunnerContextImpl(shortTermMemState, 
this::checkMailboxThread));
+        } else if (action.getExec() instanceof PythonFunction) {
+            return new PythonActionTask(
+                    key,
+                    event,
+                    action,
+                    pythonActionExecutor,
+                    new PythonRunnerContextImpl(shortTermMemState, 
this::checkMailboxThread));
+        } else {
+            throw new IllegalStateException(
+                    "Unsupported action type: " + action.getExec().getClass());
+        }
+    }
+
+    private void addActionTask(ActionTask actionTask) throws Exception {
+        actionTasksState.add(actionTask);
+        mailboxExecutor.submit(
+                () -> processActionTaskWrapper(actionTask.getKey()), "process 
action task");

Review Comment:
   processActionTaskWrapper -> tryProcessActionFromKey



##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java:
##########
@@ -116,7 +137,23 @@ public void open() throws Exception {
                         TypeInformation.of(String.class),
                         TypeInformation.of(MemoryObjectImpl.MemoryItem.class));
         shortTermMemState = 
getRuntimeContext().getMapState(shortTermMemStateDescriptor);
-        runnerContext = new RunnerContextImpl(shortTermMemState, 
this::checkMailboxThread);
+
+        // init agent processing related state
+        actionTasksState =
+                getRuntimeContext()
+                        .getListState(
+                                new ListStateDescriptor<>(
+                                        "actionTasks", 
TypeInformation.of(ActionTask.class)));
+        pendingInputEventsState =
+                getRuntimeContext()
+                        .getListState(
+                                new ListStateDescriptor<>(
+                                        "pendingInputEvents", 
TypeInformation.of(Event.class)));
+        currentProcessingKeysOperatorState =
+                getOperatorStateBackend()
+                        .getListState(
+                                new ListStateDescriptor<>(
+                                        "currentProcessingKeys", 
TypeInformation.of(Object.class)));

Review Comment:
   1. The fact, that `actionTasksState` and `pendingInputEventsState` are keyed 
states and `currentProcessingKeysOperatorState` is operator states, can be more 
explicit.
   - `actionTasksKState`
   - `pendingInputEventsKState`
   - `currentProcessingKeysOpState`



##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java:
##########
@@ -223,4 +239,16 @@ private List<Action> getActionsTriggeredBy(Event event) {
             return agentPlan.getActionsTriggeredBy(event.getClass().getName());
         }
     }
+
+    private MailboxProcessor getMailboxProcessor() throws Exception {
+        Field field = 
MailboxExecutorImpl.class.getDeclaredField("mailboxProcessor");
+        field.setAccessible(true);
+        return (MailboxProcessor) field.get(mailboxExecutor);
+    }

Review Comment:
   I guess the elegant way is to add an interface on MailboxExecutor for 
getting the mailbox main thread, or verifying whether we are in the main thread.
   
   I'm okay with the workaround here, but let's add a TODO to fix this latter.



##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java:
##########
@@ -126,60 +163,91 @@ public void open() throws Exception {
 
     @Override
     public void processElement(StreamRecord<IN> record) throws Exception {
+        checkError();
+
         IN input = record.getValue();
         LOG.debug("Receive an element {}", input);
 
-        // 1. wrap to InputEvent first
+        // wrap to InputEvent first
         Event inputEvent = wrapToInputEvent(input);
 
-        // 2. execute action
-        LinkedList<Event> events = new LinkedList<>();
-        events.push(inputEvent);
-        while (!events.isEmpty()) {
-            Event event = events.pop();
-            List<Action> actions = getActionsTriggeredBy(event);
-            if (actions != null && !actions.isEmpty()) {
-                for (Action action : actions) {
-                    // TODO: Support multi-action execution for a single 
event. Example: A Java
-                    // event
-                    // should be processable by both Java and Python actions.
-                    // TODO: Implement asynchronous action execution.
-
-                    // execute action and collect output events
-                    LOG.debug("Try execute action {} for event {}.", 
action.getName(), event);
-                    List<Event> actionOutputEvents;
-                    if (action.getExec() instanceof JavaFunction) {
-                        action.getExec().call(event, runnerContext);
-                        actionOutputEvents = runnerContext.drainEvents();
-                    } else if (action.getExec() instanceof PythonFunction) {
-                        checkState(event instanceof PythonEvent);
-                        actionOutputEvents =
-                                pythonActionExecutor.executePythonFunction(
-                                        (PythonFunction) action.getExec(), 
(PythonEvent) event);
-                    } else {
-                        throw new RuntimeException("Unsupported action type: " 
+ action.getClass());
-                    }
-
-                    for (Event actionOutputEvent : actionOutputEvents) {
-                        if (EventUtil.isOutputEvent(actionOutputEvent)) {
-                            OUT outputData = 
getOutputFromOutputEvent(actionOutputEvent);
-                            LOG.debug(
-                                    "Collect output data {} for input {} in 
action {}.",
-                                    outputData,
-                                    input,
-                                    action.getName());
-                            
output.collect(reusedStreamRecord.replace(outputData));
-                        } else {
-                            LOG.debug(
-                                    "Collect event {} for event {} in action 
{}.",
-                                    actionOutputEvent,
-                                    event,
-                                    action.getName());
-                            events.add(actionOutputEvent);
-                        }
-                    }
+        if (currentKeyHasMoreActionTask()) {
+            // If there are already actions being processed for the current 
key, the newly incoming
+            // event should be queued and processed later. Therefore, we add 
it to
+            // pendingInputEventsState.
+            pendingInputEventsState.add(inputEvent);
+        } else {
+            // Otherwise, the new event is processed immediately.
+            processEvent(getCurrentKey(), inputEvent);
+        }
+    }
+
+    private void processEvent(Object key, Event event) throws Exception {
+        if (EventUtil.isOutputEvent(event)) {
+            // If the event is an OutputEvent, we send it downstream.
+            OUT outputData = getOutputFromOutputEvent(event);
+            output.collect(reusedStreamRecord.replace(outputData));
+        } else {
+            if (EventUtil.isInputEvent(event)) {
+                // If the event is an InputEvent, we mark that the key is 
currently being processed.
+                currentProcessingKeysOperatorState.add(key);
+            }
+            // We then obtain the triggered action and add ActionTasks to the 
waiting processing
+            // queue.
+            List<Action> triggerActions = getActionsTriggeredBy(event);
+            for (Action triggerAction : triggerActions) {
+                addActionTask(createActionTask(key, triggerAction, event));
+            }
+        }
+    }
+
+    private void processActionTask(Object key) throws Exception {
+        setCurrentKey(key);
+        ActionTask actionTask = pollFromListState(actionTasksState);
+        if (actionTask == null) {
+            throw new RuntimeException(
+                    "A null value was encountered while trying to process the 
ActionTask. This likely indicates a bug in the internal logic.");
+        }
+        boolean finished = actionTask.invoke();
+        List<Event> outputEvents = actionTask.getOutputEvents();

Review Comment:
   It seems `getOutputEvents()` and `getGeneratedActionTask()` are only 
expected to be called after `invoke()`. Why not combine these three into a 
single method?



##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java:
##########
@@ -251,4 +403,17 @@ private void checkMailboxThread() {
                 mailboxProcessor.isMailboxThread(),
                 "Expected to be running on the task mailbox thread, but was 
not.");
     }
+
+    private void checkError() {
+        if (errorCause != null) {

Review Comment:
   1. Is `errorCause` always accessed in the mailbox thread? Do we need to 
consider the thread safety?
   2. Is it guaranteed there's only one error?
   3. In which case will the error not be handled immediately?



##########
python/flink_agents/runtime/local_runner.py:
##########
@@ -115,6 +115,23 @@ def get_short_term_memory(self) -> MemoryObject:
         """
         return self._short_term_memory
 
+    def execute_async(
+        self,
+        func: Callable[[Any], Any],
+        *args: Tuple[Any, ...],
+        **kwargs: Dict[str, Any],
+    ) -> Any:
+        """Asynchronously execute the provided function. Access to memory
+         is prohibited within the function.
+        """
+        logger.info(

Review Comment:
   warn



##########
runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java:
##########
@@ -39,62 +37,103 @@ public class PythonActionExecutor {
             "from flink_agents.plan import function\n"
                     + "from flink_agents.runtime import flink_runner_context\n"
                     + "from flink_agents.runtime import python_java_utils";
+
+    // =========== RUNNER CONTEXT ===========
     private static final String CREATE_FLINK_RUNNER_CONTEXT =
             "flink_runner_context.create_flink_runner_context";
+    private static final String FLINK_RUNNER_CONTEXT_VAR_NAME_PREFIX = 
"flink_runner_context_";
+    private static final AtomicLong FLINK_RUNNER_CONTEXT_VAR_ID = new 
AtomicLong(0);
+
+    // ========== ASYNC THREAD POOL ===========
+    private static final String CREATE_ASYNC_THREAD_POOL =
+            "flink_runner_context.create_async_thread_pool";
+    private static final String PYTHON_ASYNC_THREAD_POOL_VAR_NAME_PREFIX =
+            "python_async_thread_pool";
+    private static final AtomicLong PYTHON_ASYNC_THREAD_POOL_VAR_ID = new 
AtomicLong(0);
+
+    // =========== PYTHON GENERATOR ===========
+    private static final String CALL_PYTHON_GENERATOR = 
"function.call_python_generator";
+    private static final String PYTHON_GENERATOR_VAR_NAME_PREFIX = 
"python_generator_";
+    private static final AtomicLong PYTHON_GENERATOR_VAR_ID = new 
AtomicLong(0);
+
+    // =========== PYTHON AND JAVA OBJECT CONVERT ===========
     private static final String CONVERT_TO_PYTHON_OBJECT =
             "python_java_utils.convert_to_python_object";
     private static final String WRAP_TO_INPUT_EVENT = 
"python_java_utils.wrap_to_input_event";
     private static final String GET_OUTPUT_FROM_OUTPUT_EVENT =
             "python_java_utils.get_output_from_output_event";
-    private static final String FLINK_RUNNER_CONTEXT_VAR_NAME = 
"flink_runner_context";
 
     private final PythonEnvironmentManager environmentManager;
     private final String agentPlanJson;
-    private PythonRunnerContextImpl runnerContext;
-
     private PythonInterpreter interpreter;
+    private String pythonAsyncThreadPoolObjectName;
 
     public PythonActionExecutor(PythonEnvironmentManager environmentManager, 
String agentPlanJson) {
         this.environmentManager = environmentManager;
         this.agentPlanJson = agentPlanJson;
     }
 
-    public void open(
-            MapState<String, MemoryObjectImpl.MemoryItem> shortTermMemState,
-            Runnable mailboxThreadChecker)
-            throws Exception {
+    public void open() throws Exception {
         environmentManager.open();
         EmbeddedPythonEnvironment env = environmentManager.createEnvironment();
 
         interpreter = env.getInterpreter();
         interpreter.exec(PYTHON_IMPORTS);
 
-        runnerContext = new PythonRunnerContextImpl(shortTermMemState, 
mailboxThreadChecker);
-
-        // TODO: remove the set and get runner context after updating pemja to 
version 0.5.3
-        Object pythonRunnerContextObject =
-                interpreter.invoke(CREATE_FLINK_RUNNER_CONTEXT, runnerContext, 
agentPlanJson);
-        interpreter.set(FLINK_RUNNER_CONTEXT_VAR_NAME, 
pythonRunnerContextObject);
+        // TODO: remove the set and get thread pool after updating pemja to 
version 0.5.3
+        Object pythonAsyncThreadPool = 
interpreter.invoke(CREATE_ASYNC_THREAD_POOL);
+        this.pythonAsyncThreadPoolObjectName =
+                PYTHON_ASYNC_THREAD_POOL_VAR_NAME_PREFIX
+                        + PYTHON_ASYNC_THREAD_POOL_VAR_ID.incrementAndGet();
+        interpreter.set(pythonAsyncThreadPoolObjectName, 
pythonAsyncThreadPool);
     }
 
-    public List<Event> executePythonFunction(PythonFunction function, 
PythonEvent event)
+    /**
+     * Execute the Python function, which may return a Python generator that 
needs to be processed
+     * in the future. Due to an issue in Pemja regarding incorrect object 
reference counting, this
+     * may lead to garbage collection of the object. To prevent this, we use 
the set and get methods
+     * to manually increment the object's reference count, then return the 
name of the Python
+     * generator variable.
+     *
+     * @return The name of the Python generator variable. It may be null if 
the Python function does
+     *     not return a generator.
+     */
+    public String executePythonFunction(
+            PythonFunction function, PythonEvent event, RunnerContextImpl 
runnerContext)
             throws Exception {
         runnerContext.checkNoPendingEvents();
         function.setInterpreter(interpreter);
 
         // TODO: remove the set and get runner context after updating pemja to 
version 0.5.3
-        Object pythonRunnerContextObject = 
interpreter.get(FLINK_RUNNER_CONTEXT_VAR_NAME);
+        Object pythonRunnerContextObject =
+                interpreter.invoke(
+                        CREATE_FLINK_RUNNER_CONTEXT,
+                        runnerContext,
+                        agentPlanJson,
+                        interpreter.get(pythonAsyncThreadPoolObjectName));
+        String pythonRunnerContextObjectName =
+                FLINK_RUNNER_CONTEXT_VAR_NAME_PREFIX
+                        + FLINK_RUNNER_CONTEXT_VAR_ID.incrementAndGet();

Review Comment:
   Why do we need to create a new object for each time calling the function?



##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java:
##########
@@ -240,6 +308,90 @@ private List<Action> getActionsTriggeredBy(Event event) {
         }
     }
 
+    private <T> T pollFromListState(ListState<T> listState) throws Exception {
+        Iterator<T> listStateIterator = listState.get().iterator();
+        if (!listStateIterator.hasNext()) {
+            return null;
+        }
+
+        T polled = listStateIterator.next();
+        List<T> remaining = new ArrayList<>();
+        while (listStateIterator.hasNext()) {
+            remaining.add(listStateIterator.next());
+        }
+        listState.clear();
+        listState.update(remaining);
+        return polled;
+    }
+
+    private ActionTask createActionTask(Object key, Action action, Event 
event) {
+        if (action.getExec() instanceof JavaFunction) {
+            return new JavaActionTask(
+                    key,
+                    event,
+                    action,
+                    new RunnerContextImpl(shortTermMemState, 
this::checkMailboxThread));
+        } else if (action.getExec() instanceof PythonFunction) {
+            return new PythonActionTask(
+                    key,
+                    event,
+                    action,
+                    pythonActionExecutor,
+                    new PythonRunnerContextImpl(shortTermMemState, 
this::checkMailboxThread));
+        } else {
+            throw new IllegalStateException(
+                    "Unsupported action type: " + action.getExec().getClass());
+        }
+    }
+
+    private void addActionTask(ActionTask actionTask) throws Exception {
+        actionTasksState.add(actionTask);
+        mailboxExecutor.submit(
+                () -> processActionTaskWrapper(actionTask.getKey()), "process 
action task");

Review Comment:
   It seems we are assuming the actions in `actionTasksState` are consistent 
with `processActionTaskWrapper` in the mailbox queue. This seems fragile. We 
should control the order of actions at one place.



##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java:
##########
@@ -116,7 +137,23 @@ public void open() throws Exception {
                         TypeInformation.of(String.class),
                         TypeInformation.of(MemoryObjectImpl.MemoryItem.class));
         shortTermMemState = 
getRuntimeContext().getMapState(shortTermMemStateDescriptor);
-        runnerContext = new RunnerContextImpl(shortTermMemState, 
this::checkMailboxThread);
+
+        // init agent processing related state
+        actionTasksState =
+                getRuntimeContext()
+                        .getListState(
+                                new ListStateDescriptor<>(
+                                        "actionTasks", 
TypeInformation.of(ActionTask.class)));
+        pendingInputEventsState =
+                getRuntimeContext()
+                        .getListState(
+                                new ListStateDescriptor<>(
+                                        "pendingInputEvents", 
TypeInformation.of(Event.class)));
+        currentProcessingKeysOperatorState =
+                getOperatorStateBackend()
+                        .getListState(
+                                new ListStateDescriptor<>(
+                                        "currentProcessingKeys", 
TypeInformation.of(Object.class)));

Review Comment:
   2. How do we handle the redistribution of the operator state?



##########
runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.python.operator;
+
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.plan.Action;
+import org.apache.flink.agents.plan.PythonFunction;
+import org.apache.flink.agents.runtime.context.RunnerContextImpl;
+import org.apache.flink.agents.runtime.operator.ActionTask;
+import org.apache.flink.agents.runtime.python.event.PythonEvent;
+import org.apache.flink.agents.runtime.python.utils.PythonActionExecutor;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A special {@link ActionTask} designed to execute a Python action task.
+ *
+ * <p>During asynchronous execution in Python, the {@link PythonActionTask} 
can produce a {@link
+ * PythonGeneratorActionTask} to represent the subsequent code block when 
needed.
+ */
+public class PythonActionTask extends ActionTask {
+
+    protected final PythonActionExecutor pythonActionExecutor;
+
+    public PythonActionTask(
+            Object key,
+            Event event,
+            Action action,
+            PythonActionExecutor pythonActionExecutor,
+            RunnerContextImpl runnerContext) {
+        super(key, event, action, runnerContext);
+        checkState(action.getExec() instanceof PythonFunction);
+        checkState(
+                event instanceof PythonEvent,
+                "Python action only accept python event, but got " + event);
+        this.pythonActionExecutor = pythonActionExecutor;
+    }
+
+    public boolean invoke() throws Exception {
+        LOG.debug(
+                "Try execute python action {} for event {} with key {}.",
+                action.getName(),
+                event,
+                key);
+        runnerContext.checkNoPendingEvents();
+
+        String pythonGeneratorVarName =
+                pythonActionExecutor.executePythonFunction(
+                        (PythonFunction) action.getExec(), (PythonEvent) 
event, runnerContext);
+        if (pythonGeneratorVarName != null) {
+            // The Python action generates a generator. We need to execute it 
once, which will
+            // submit an asynchronous task and return whether the action has 
been completed.

Review Comment:
   I think the key information here, which should be explained, is that we 
don't want to convert actions which don't call `async_execute()` into 
generators.



##########
runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.python.operator;
+
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.plan.Action;
+import org.apache.flink.agents.plan.PythonFunction;
+import org.apache.flink.agents.runtime.context.RunnerContextImpl;
+import org.apache.flink.agents.runtime.operator.ActionTask;
+import org.apache.flink.agents.runtime.python.event.PythonEvent;
+import org.apache.flink.agents.runtime.python.utils.PythonActionExecutor;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A special {@link ActionTask} designed to execute a Python action task.
+ *
+ * <p>During asynchronous execution in Python, the {@link PythonActionTask} 
can produce a {@link
+ * PythonGeneratorActionTask} to represent the subsequent code block when 
needed.
+ */
+public class PythonActionTask extends ActionTask {
+
+    protected final PythonActionExecutor pythonActionExecutor;
+
+    public PythonActionTask(
+            Object key,
+            Event event,
+            Action action,
+            PythonActionExecutor pythonActionExecutor,
+            RunnerContextImpl runnerContext) {
+        super(key, event, action, runnerContext);
+        checkState(action.getExec() instanceof PythonFunction);
+        checkState(
+                event instanceof PythonEvent,
+                "Python action only accept python event, but got " + event);
+        this.pythonActionExecutor = pythonActionExecutor;
+    }
+
+    public boolean invoke() throws Exception {
+        LOG.debug(
+                "Try execute python action {} for event {} with key {}.",
+                action.getName(),
+                event,
+                key);
+        runnerContext.checkNoPendingEvents();
+
+        String pythonGeneratorVarName =

Review Comment:
   Or create a Java class for python generator?



##########
runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.python.operator;
+
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.plan.Action;
+import org.apache.flink.agents.runtime.context.RunnerContextImpl;
+import org.apache.flink.agents.runtime.operator.ActionTask;
+import org.apache.flink.agents.runtime.python.utils.PythonActionExecutor;
+
+/** An {@link ActionTask} wrapper a Python Generator to represent a code block 
in Python action. */
+public class PythonGeneratorActionTask extends PythonActionTask {
+    private final String pythonGeneratorVarName;
+
+    public PythonGeneratorActionTask(
+            Object key,
+            Event event,
+            Action action,
+            PythonActionExecutor pythonActionExecutor,
+            RunnerContextImpl runnerContext,
+            String pythonGeneratorVarName) {
+        super(key, event, action, pythonActionExecutor, runnerContext);
+        this.pythonGeneratorVarName = pythonGeneratorVarName;
+    }
+
+    @Override
+    public boolean invoke() throws Exception {
+        LOG.debug(
+                "Try execute python generator action {} for event {} with key 
{}.",
+                action.getName(),
+                event,
+                key);
+        boolean finished = 
pythonActionExecutor.callPythonGenerator(pythonGeneratorVarName).f0;

Review Comment:
   `f1` is never used?



##########
runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java:
##########
@@ -39,62 +37,103 @@ public class PythonActionExecutor {
             "from flink_agents.plan import function\n"
                     + "from flink_agents.runtime import flink_runner_context\n"
                     + "from flink_agents.runtime import python_java_utils";
+
+    // =========== RUNNER CONTEXT ===========
     private static final String CREATE_FLINK_RUNNER_CONTEXT =
             "flink_runner_context.create_flink_runner_context";
+    private static final String FLINK_RUNNER_CONTEXT_VAR_NAME_PREFIX = 
"flink_runner_context_";
+    private static final AtomicLong FLINK_RUNNER_CONTEXT_VAR_ID = new 
AtomicLong(0);
+
+    // ========== ASYNC THREAD POOL ===========
+    private static final String CREATE_ASYNC_THREAD_POOL =
+            "flink_runner_context.create_async_thread_pool";
+    private static final String PYTHON_ASYNC_THREAD_POOL_VAR_NAME_PREFIX =
+            "python_async_thread_pool";
+    private static final AtomicLong PYTHON_ASYNC_THREAD_POOL_VAR_ID = new 
AtomicLong(0);
+
+    // =========== PYTHON GENERATOR ===========
+    private static final String CALL_PYTHON_GENERATOR = 
"function.call_python_generator";
+    private static final String PYTHON_GENERATOR_VAR_NAME_PREFIX = 
"python_generator_";
+    private static final AtomicLong PYTHON_GENERATOR_VAR_ID = new 
AtomicLong(0);
+
+    // =========== PYTHON AND JAVA OBJECT CONVERT ===========
     private static final String CONVERT_TO_PYTHON_OBJECT =
             "python_java_utils.convert_to_python_object";
     private static final String WRAP_TO_INPUT_EVENT = 
"python_java_utils.wrap_to_input_event";
     private static final String GET_OUTPUT_FROM_OUTPUT_EVENT =
             "python_java_utils.get_output_from_output_event";
-    private static final String FLINK_RUNNER_CONTEXT_VAR_NAME = 
"flink_runner_context";
 
     private final PythonEnvironmentManager environmentManager;
     private final String agentPlanJson;
-    private PythonRunnerContextImpl runnerContext;
-
     private PythonInterpreter interpreter;
+    private String pythonAsyncThreadPoolObjectName;
 
     public PythonActionExecutor(PythonEnvironmentManager environmentManager, 
String agentPlanJson) {
         this.environmentManager = environmentManager;
         this.agentPlanJson = agentPlanJson;
     }
 
-    public void open(
-            MapState<String, MemoryObjectImpl.MemoryItem> shortTermMemState,
-            Runnable mailboxThreadChecker)
-            throws Exception {
+    public void open() throws Exception {
         environmentManager.open();
         EmbeddedPythonEnvironment env = environmentManager.createEnvironment();
 
         interpreter = env.getInterpreter();
         interpreter.exec(PYTHON_IMPORTS);
 
-        runnerContext = new PythonRunnerContextImpl(shortTermMemState, 
mailboxThreadChecker);
-
-        // TODO: remove the set and get runner context after updating pemja to 
version 0.5.3
-        Object pythonRunnerContextObject =
-                interpreter.invoke(CREATE_FLINK_RUNNER_CONTEXT, runnerContext, 
agentPlanJson);
-        interpreter.set(FLINK_RUNNER_CONTEXT_VAR_NAME, 
pythonRunnerContextObject);
+        // TODO: remove the set and get thread pool after updating pemja to 
version 0.5.3
+        Object pythonAsyncThreadPool = 
interpreter.invoke(CREATE_ASYNC_THREAD_POOL);
+        this.pythonAsyncThreadPoolObjectName =
+                PYTHON_ASYNC_THREAD_POOL_VAR_NAME_PREFIX
+                        + PYTHON_ASYNC_THREAD_POOL_VAR_ID.incrementAndGet();
+        interpreter.set(pythonAsyncThreadPoolObjectName, 
pythonAsyncThreadPool);

Review Comment:
   1. What does this todo mean?
   2. Is it necessary to have separate thread pools for each operator?



##########
runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.python.operator;
+
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.plan.Action;
+import org.apache.flink.agents.plan.PythonFunction;
+import org.apache.flink.agents.runtime.context.RunnerContextImpl;
+import org.apache.flink.agents.runtime.operator.ActionTask;
+import org.apache.flink.agents.runtime.python.event.PythonEvent;
+import org.apache.flink.agents.runtime.python.utils.PythonActionExecutor;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A special {@link ActionTask} designed to execute a Python action task.
+ *
+ * <p>During asynchronous execution in Python, the {@link PythonActionTask} 
can produce a {@link
+ * PythonGeneratorActionTask} to represent the subsequent code block when 
needed.
+ */
+public class PythonActionTask extends ActionTask {
+
+    protected final PythonActionExecutor pythonActionExecutor;
+
+    public PythonActionTask(
+            Object key,
+            Event event,
+            Action action,
+            PythonActionExecutor pythonActionExecutor,
+            RunnerContextImpl runnerContext) {
+        super(key, event, action, runnerContext);
+        checkState(action.getExec() instanceof PythonFunction);
+        checkState(
+                event instanceof PythonEvent,
+                "Python action only accept python event, but got " + event);
+        this.pythonActionExecutor = pythonActionExecutor;
+    }
+
+    public boolean invoke() throws Exception {
+        LOG.debug(
+                "Try execute python action {} for event {} with key {}.",
+                action.getName(),
+                event,
+                key);
+        runnerContext.checkNoPendingEvents();
+
+        String pythonGeneratorVarName =

Review Comment:
   The name is confusing. Maybe `pythonGeneratorRef`?



##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java:
##########
@@ -240,6 +308,90 @@ private List<Action> getActionsTriggeredBy(Event event) {
         }
     }
 
+    private <T> T pollFromListState(ListState<T> listState) throws Exception {

Review Comment:
   1. These list state operations seems heavy. They should be avoid as much as 
possible.
   2. We might move them into a util.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to