This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new 2f21451  [Feature][runtime] Introducing the FlinkRunnerContext 
component to support Python workflow execution (#43)
2f21451 is described below

commit 2f21451e5c3d32bfef5fbdb46407e0a49e1a79f5
Author: Eugene <105473769+greateugen...@users.noreply.github.com>
AuthorDate: Tue Jul 1 10:30:51 2025 +0800

    [Feature][runtime] Introducing the FlinkRunnerContext component to support 
Python workflow execution (#43)
---
 plan/pom.xml                                       |  5 ++
 .../apache/flink/agents/plan/PythonFunction.java   | 18 ++++-
 python/flink_agents/plan/function.py               |  6 ++
 .../flink_agents/runtime/flink_runner_context.py   | 61 +++++++++++++++
 python/flink_agents/runtime/python_java_utils.py   | 24 ++++++
 runtime/pom.xml                                    |  5 ++
 .../flink/agents/runtime/PythonActionExecutor.java | 90 ++++++++++++++++++++++
 .../apache/flink/agents/runtime/PythonEvent.java   | 41 ++++++++++
 .../runtime/context/PythonRunnerContext.java       | 54 +++++++++++++
 .../runtime/env/PythonEnvironmentManager.java      |  3 +-
 .../runtime/env/PythonEnvironmentManagerTest.java  |  9 +--
 11 files changed, 306 insertions(+), 10 deletions(-)

diff --git a/plan/pom.xml b/plan/pom.xml
index adf46b1..6d1d69d 100644
--- a/plan/pom.xml
+++ b/plan/pom.xml
@@ -39,6 +39,11 @@ under the License.
             <artifactId>flink-shaded-jackson</artifactId>
             
<version>${flink.shaded.jackson.version}-${flink.shaded.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>pemja</artifactId>
+            <version>${pemja.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git 
a/plan/src/main/java/org/apache/flink/agents/plan/PythonFunction.java 
b/plan/src/main/java/org/apache/flink/agents/plan/PythonFunction.java
index 4d8f100..3c7d48f 100644
--- a/plan/src/main/java/org/apache/flink/agents/plan/PythonFunction.java
+++ b/plan/src/main/java/org/apache/flink/agents/plan/PythonFunction.java
@@ -17,22 +17,36 @@
  */
 package org.apache.flink.agents.plan;
 
-// TODO: implement.
+import pemja.core.PythonInterpreter;
+
 /** Represent a Python function. */
 public class PythonFunction implements Function {
+    private static final String CALL_PYTHON_FUNCTION = 
"function.call_python_function";
+
     private final String module;
     private final String qualName;
 
+    private transient PythonInterpreter interpreter;
+
     public PythonFunction(String module, String qualName) {
         this.module = module;
         this.qualName = qualName;
     }
 
+    public void setInterpreter(PythonInterpreter interpreter) {
+        this.interpreter = interpreter;
+    }
+
     @Override
     public Object call(Object... args) throws Exception {
-        throw new UnsupportedOperationException();
+        if (interpreter == null) {
+            throw new IllegalStateException("Python interpreter is not set.");
+        }
+
+        return interpreter.invoke(CALL_PYTHON_FUNCTION, module, qualName, 
args);
     }
 
+    // TODO: check Python function signature compatibility with given 
parameter types
     @Override
     public void checkSignature(Class<?>[] parameterTypes) throws Exception {
         throw new UnsupportedOperationException();
diff --git a/python/flink_agents/plan/function.py 
b/python/flink_agents/plan/function.py
index cf1f3e2..a2ae521 100644
--- a/python/flink_agents/plan/function.py
+++ b/python/flink_agents/plan/function.py
@@ -141,3 +141,9 @@ class JavaFunction(Function):
 
     def check_signature(self, *args: Tuple[Any, ...]) -> None:
         """Check function signature is legal or not."""
+
+
+def call_python_function(module: str, qualname: str, func_args: Tuple[Any, 
...]) -> Any:
+    """Used to call a Python function in the Pemja environment."""
+    func = PythonFunction(module=module, qualname=qualname)
+    return func(*func_args)
diff --git a/python/flink_agents/runtime/flink_runner_context.py 
b/python/flink_agents/runtime/flink_runner_context.py
new file mode 100644
index 0000000..2491ff9
--- /dev/null
+++ b/python/flink_agents/runtime/flink_runner_context.py
@@ -0,0 +1,61 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from typing import Any
+
+import cloudpickle
+from typing_extensions import override
+
+from flink_agents.api.event import Event
+from flink_agents.api.runner_context import RunnerContext
+
+
+class FlinkRunnerContext(RunnerContext):
+    """Providing context for workflow execution in Flink Environment.
+
+    This context allows access to event handling.
+    """
+
+    def __init__(self, j_runner_context: Any) -> None:
+        """Initialize a flink runner context with the given java runner 
context.
+
+        Parameters
+        ----------
+        j_runner_context : Any
+            Java runner context used to synchronize data between Python and 
Java.
+        """
+        self._j_runner_context = j_runner_context
+
+    @override
+    def send_event(self, event: Event) -> None:
+        """Send an event to the workflow for processing.
+
+        Parameters
+        ----------
+        event : Event
+            The event to be processed by the workflow system.
+        """
+        try:
+            class_path = 
f"{event.__class__.__module__}.{event.__class__.__qualname__}"
+            self._j_runner_context.sendEvent(class_path, 
cloudpickle.dumps(event))
+        except Exception as e:
+            err_msg = "Failed to send event " + class_path + " to runner 
context"
+            raise RuntimeError(err_msg) from e
+
+def create_flink_runner_context(j_runner_context: Any) -> FlinkRunnerContext:
+    """Used to create a FlinkRunnerContext Python object in Pemja 
environment."""
+    return FlinkRunnerContext(j_runner_context)
diff --git a/python/flink_agents/runtime/python_java_utils.py 
b/python/flink_agents/runtime/python_java_utils.py
new file mode 100644
index 0000000..24ec623
--- /dev/null
+++ b/python/flink_agents/runtime/python_java_utils.py
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Any
+
+import cloudpickle
+
+
+def convert_to_python_object(bytesObject: bytes) -> Any:
+    """Used for deserializing Python objects."""
+    return cloudpickle.loads(bytesObject)
diff --git a/runtime/pom.xml b/runtime/pom.xml
index 46657de..53bcebb 100644
--- a/runtime/pom.xml
+++ b/runtime/pom.xml
@@ -29,6 +29,11 @@ under the License.
     <name>Flink Agents : Runtime</name>
 
     <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-agents-plan</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-streaming-java</artifactId>
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/PythonActionExecutor.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/PythonActionExecutor.java
new file mode 100644
index 0000000..652356d
--- /dev/null
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/PythonActionExecutor.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime;
+
+import org.apache.flink.agents.plan.PythonFunction;
+import org.apache.flink.agents.runtime.context.PythonRunnerContext;
+import org.apache.flink.agents.runtime.env.EmbeddedPythonEnvironment;
+import org.apache.flink.agents.runtime.env.PythonEnvironmentManager;
+import pemja.core.PythonInterpreter;
+
+import java.util.List;
+
+/** Execute the corresponding Python action in the workflow. */
+public class PythonActionExecutor {
+
+    private static final String PYTHON_IMPORTS =
+            "from flink_agents.plan import function\n"
+                    + "from flink_agents.runtime import flink_runner_context\n"
+                    + "from flink_agents.runtime import python_java_utils";
+    private static final String CREATE_FLINK_RUNNER_CONTEXT =
+            "flink_runner_context.create_flink_runner_context";
+    private static final String CONVERT_TO_PYTHON_OBJECT =
+            "python_java_utils.convert_to_python_object";
+    private static final String FLINK_RUNNER_CONTEXT_VAR_NAME = 
"flink_runner_context";
+
+    private final PythonEnvironmentManager environmentManager;
+    private final PythonRunnerContext runnerContext;
+
+    private PythonInterpreter interpreter;
+
+    public PythonActionExecutor(PythonEnvironmentManager environmentManager) {
+        this.environmentManager = environmentManager;
+        this.runnerContext = new PythonRunnerContext();
+    }
+
+    public void open() throws Exception {
+        environmentManager.open();
+        EmbeddedPythonEnvironment env = environmentManager.createEnvironment();
+
+        interpreter = env.getInterpreter();
+        interpreter.exec(PYTHON_IMPORTS);
+
+        // TODO: remove the set and get runner context after updating pemja to 
version 0.5.3
+        Object pythonRunnerContextObject =
+                interpreter.invoke(CREATE_FLINK_RUNNER_CONTEXT, runnerContext);
+        interpreter.set(FLINK_RUNNER_CONTEXT_VAR_NAME, 
pythonRunnerContextObject);
+    }
+
+    public List<PythonEvent> executePythonFunction(PythonFunction function, 
PythonEvent event)
+            throws Exception {
+        runnerContext.checkNoPendingEvents();
+        function.setInterpreter(interpreter);
+
+        // TODO: remove the set and get runner context after updating pemja to 
version 0.5.3
+        Object pythonRunnerContextObject = 
interpreter.get(FLINK_RUNNER_CONTEXT_VAR_NAME);
+
+        Object pythonEventObject = 
interpreter.invoke(CONVERT_TO_PYTHON_OBJECT, event.getEvent());
+
+        try {
+            function.call(pythonEventObject, pythonRunnerContextObject);
+        } catch (Exception e) {
+            runnerContext.drainEvents();
+            throw new PythonActionExecutionException("Failed to execute Python 
action", e);
+        }
+
+        return runnerContext.drainEvents();
+    }
+
+    /** Failed to execute Python action. */
+    public static class PythonActionExecutionException extends Exception {
+        public PythonActionExecutionException(String message, Throwable cause) 
{
+            super(message, cause);
+        }
+    }
+}
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/PythonEvent.java 
b/runtime/src/main/java/org/apache/flink/agents/runtime/PythonEvent.java
new file mode 100644
index 0000000..6bb277e
--- /dev/null
+++ b/runtime/src/main/java/org/apache/flink/agents/runtime/PythonEvent.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.agents.runtime;
+
+import org.apache.flink.agents.api.Event;
+
+public class PythonEvent extends Event {
+    private final byte[] event;
+    private final String eventType;
+
+    public PythonEvent(byte[] event, String eventType) {
+        super();
+        this.event = event;
+        this.eventType = eventType;
+    }
+
+    public byte[] getEvent() {
+        return event;
+    }
+
+    public String getEventType() {
+        return eventType;
+    }
+}
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/PythonRunnerContext.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/PythonRunnerContext.java
new file mode 100644
index 0000000..e3e8e12
--- /dev/null
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/PythonRunnerContext.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.context;
+
+import org.apache.flink.agents.runtime.PythonEvent;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class is used to manage the execution context when interacting with 
Python functions,
+ * including collecting new events generated during the execution of actions.
+ */
+@NotThreadSafe
+public class PythonRunnerContext {
+    private final List<PythonEvent> pendingEvents;
+
+    public PythonRunnerContext() {
+        this.pendingEvents = new ArrayList<>();
+    }
+
+    public void sendEvent(String type, byte[] event) {
+        this.pendingEvents.add(new PythonEvent(event, type));
+    }
+
+    public List<PythonEvent> drainEvents() {
+        List<PythonEvent> list = new ArrayList<>(this.pendingEvents);
+        this.pendingEvents.clear();
+        return list;
+    }
+
+    public void checkNoPendingEvents() {
+        Preconditions.checkState(
+                this.pendingEvents.isEmpty(), "There are pending events 
remaining in the context.");
+    }
+}
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/env/PythonEnvironmentManager.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/env/PythonEnvironmentManager.java
index 0e554c0..99429da 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/env/PythonEnvironmentManager.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/env/PythonEnvironmentManager.java
@@ -21,7 +21,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.python.env.AbstractPythonEnvironmentManager;
 import org.apache.flink.python.env.PythonDependencyInfo;
-import org.apache.flink.python.env.PythonEnvironment;
 import pemja.core.PythonInterpreterConfig;
 
 import java.util.HashMap;
@@ -43,7 +42,7 @@ public class PythonEnvironmentManager extends 
AbstractPythonEnvironmentManager {
     }
 
     @Override
-    public PythonEnvironment createEnvironment() {
+    public EmbeddedPythonEnvironment createEnvironment() {
         Map<String, String> env = new HashMap<>(getPythonEnv());
 
         PythonInterpreterConfig.PythonInterpreterConfigBuilder 
interpreterConfigBuilder =
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/env/PythonEnvironmentManagerTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/env/PythonEnvironmentManagerTest.java
index e90f4c4..d3f86aa 100644
--- 
a/runtime/src/test/java/org/apache/flink/agents/runtime/env/PythonEnvironmentManagerTest.java
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/env/PythonEnvironmentManagerTest.java
@@ -151,8 +151,7 @@ public class PythonEnvironmentManagerTest {
             expected.put("python", "/usr/local/bin/python");
             assertEquals(expected, environmentVariable);
 
-            EmbeddedPythonEnvironment env =
-                    (EmbeddedPythonEnvironment) 
environmentManager.createEnvironment();
+            EmbeddedPythonEnvironment env = 
environmentManager.createEnvironment();
 
             assertEquals(expected, env.getEnv());
 
@@ -227,8 +226,7 @@ public class PythonEnvironmentManagerTest {
                                     File.separator, baseDir, PYTHON_FILES_DIR, 
"dir0", "test_dir")),
                     false);
 
-            EmbeddedPythonEnvironment env =
-                    (EmbeddedPythonEnvironment) 
environmentManager.createEnvironment();
+            EmbeddedPythonEnvironment env = 
environmentManager.createEnvironment();
             PythonInterpreterConfig config = env.getConfig();
             assertArrayEquals(expectedPythonPath.split(File.pathSeparator), 
config.getPaths());
         }
@@ -265,8 +263,7 @@ public class PythonEnvironmentManagerTest {
                     new File(String.join(File.separator, tmpBase, 
PYTHON_ARCHIVES_DIR, "py312")),
                     true);
 
-            EmbeddedPythonEnvironment env =
-                    (EmbeddedPythonEnvironment) 
environmentManager.createEnvironment();
+            EmbeddedPythonEnvironment env = 
environmentManager.createEnvironment();
 
             assertEquals(expected, env.getEnv());
 

Reply via email to