This is an automated email from the ASF dual-hosted git repository. hxb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push: new 2f21451 [Feature][runtime] Introducing the FlinkRunnerContext component to support Python workflow execution (#43) 2f21451 is described below commit 2f21451e5c3d32bfef5fbdb46407e0a49e1a79f5 Author: Eugene <105473769+greateugen...@users.noreply.github.com> AuthorDate: Tue Jul 1 10:30:51 2025 +0800 [Feature][runtime] Introducing the FlinkRunnerContext component to support Python workflow execution (#43) --- plan/pom.xml | 5 ++ .../apache/flink/agents/plan/PythonFunction.java | 18 ++++- python/flink_agents/plan/function.py | 6 ++ .../flink_agents/runtime/flink_runner_context.py | 61 +++++++++++++++ python/flink_agents/runtime/python_java_utils.py | 24 ++++++ runtime/pom.xml | 5 ++ .../flink/agents/runtime/PythonActionExecutor.java | 90 ++++++++++++++++++++++ .../apache/flink/agents/runtime/PythonEvent.java | 41 ++++++++++ .../runtime/context/PythonRunnerContext.java | 54 +++++++++++++ .../runtime/env/PythonEnvironmentManager.java | 3 +- .../runtime/env/PythonEnvironmentManagerTest.java | 9 +-- 11 files changed, 306 insertions(+), 10 deletions(-) diff --git a/plan/pom.xml b/plan/pom.xml index adf46b1..6d1d69d 100644 --- a/plan/pom.xml +++ b/plan/pom.xml @@ -39,6 +39,11 @@ under the License. <artifactId>flink-shaded-jackson</artifactId> <version>${flink.shaded.jackson.version}-${flink.shaded.version}</version> </dependency> + <dependency> + <groupId>com.alibaba</groupId> + <artifactId>pemja</artifactId> + <version>${pemja.version}</version> + </dependency> </dependencies> </project> \ No newline at end of file diff --git a/plan/src/main/java/org/apache/flink/agents/plan/PythonFunction.java b/plan/src/main/java/org/apache/flink/agents/plan/PythonFunction.java index 4d8f100..3c7d48f 100644 --- a/plan/src/main/java/org/apache/flink/agents/plan/PythonFunction.java +++ b/plan/src/main/java/org/apache/flink/agents/plan/PythonFunction.java @@ -17,22 +17,36 @@ */ package org.apache.flink.agents.plan; -// TODO: implement. +import pemja.core.PythonInterpreter; + /** Represent a Python function. */ public class PythonFunction implements Function { + private static final String CALL_PYTHON_FUNCTION = "function.call_python_function"; + private final String module; private final String qualName; + private transient PythonInterpreter interpreter; + public PythonFunction(String module, String qualName) { this.module = module; this.qualName = qualName; } + public void setInterpreter(PythonInterpreter interpreter) { + this.interpreter = interpreter; + } + @Override public Object call(Object... args) throws Exception { - throw new UnsupportedOperationException(); + if (interpreter == null) { + throw new IllegalStateException("Python interpreter is not set."); + } + + return interpreter.invoke(CALL_PYTHON_FUNCTION, module, qualName, args); } + // TODO: check Python function signature compatibility with given parameter types @Override public void checkSignature(Class<?>[] parameterTypes) throws Exception { throw new UnsupportedOperationException(); diff --git a/python/flink_agents/plan/function.py b/python/flink_agents/plan/function.py index cf1f3e2..a2ae521 100644 --- a/python/flink_agents/plan/function.py +++ b/python/flink_agents/plan/function.py @@ -141,3 +141,9 @@ class JavaFunction(Function): def check_signature(self, *args: Tuple[Any, ...]) -> None: """Check function signature is legal or not.""" + + +def call_python_function(module: str, qualname: str, func_args: Tuple[Any, ...]) -> Any: + """Used to call a Python function in the Pemja environment.""" + func = PythonFunction(module=module, qualname=qualname) + return func(*func_args) diff --git a/python/flink_agents/runtime/flink_runner_context.py b/python/flink_agents/runtime/flink_runner_context.py new file mode 100644 index 0000000..2491ff9 --- /dev/null +++ b/python/flink_agents/runtime/flink_runner_context.py @@ -0,0 +1,61 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +from typing import Any + +import cloudpickle +from typing_extensions import override + +from flink_agents.api.event import Event +from flink_agents.api.runner_context import RunnerContext + + +class FlinkRunnerContext(RunnerContext): + """Providing context for workflow execution in Flink Environment. + + This context allows access to event handling. + """ + + def __init__(self, j_runner_context: Any) -> None: + """Initialize a flink runner context with the given java runner context. + + Parameters + ---------- + j_runner_context : Any + Java runner context used to synchronize data between Python and Java. + """ + self._j_runner_context = j_runner_context + + @override + def send_event(self, event: Event) -> None: + """Send an event to the workflow for processing. + + Parameters + ---------- + event : Event + The event to be processed by the workflow system. + """ + try: + class_path = f"{event.__class__.__module__}.{event.__class__.__qualname__}" + self._j_runner_context.sendEvent(class_path, cloudpickle.dumps(event)) + except Exception as e: + err_msg = "Failed to send event " + class_path + " to runner context" + raise RuntimeError(err_msg) from e + +def create_flink_runner_context(j_runner_context: Any) -> FlinkRunnerContext: + """Used to create a FlinkRunnerContext Python object in Pemja environment.""" + return FlinkRunnerContext(j_runner_context) diff --git a/python/flink_agents/runtime/python_java_utils.py b/python/flink_agents/runtime/python_java_utils.py new file mode 100644 index 0000000..24ec623 --- /dev/null +++ b/python/flink_agents/runtime/python_java_utils.py @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from typing import Any + +import cloudpickle + + +def convert_to_python_object(bytesObject: bytes) -> Any: + """Used for deserializing Python objects.""" + return cloudpickle.loads(bytesObject) diff --git a/runtime/pom.xml b/runtime/pom.xml index 46657de..53bcebb 100644 --- a/runtime/pom.xml +++ b/runtime/pom.xml @@ -29,6 +29,11 @@ under the License. <name>Flink Agents : Runtime</name> <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-agents-plan</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/PythonActionExecutor.java b/runtime/src/main/java/org/apache/flink/agents/runtime/PythonActionExecutor.java new file mode 100644 index 0000000..652356d --- /dev/null +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/PythonActionExecutor.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.agents.runtime; + +import org.apache.flink.agents.plan.PythonFunction; +import org.apache.flink.agents.runtime.context.PythonRunnerContext; +import org.apache.flink.agents.runtime.env.EmbeddedPythonEnvironment; +import org.apache.flink.agents.runtime.env.PythonEnvironmentManager; +import pemja.core.PythonInterpreter; + +import java.util.List; + +/** Execute the corresponding Python action in the workflow. */ +public class PythonActionExecutor { + + private static final String PYTHON_IMPORTS = + "from flink_agents.plan import function\n" + + "from flink_agents.runtime import flink_runner_context\n" + + "from flink_agents.runtime import python_java_utils"; + private static final String CREATE_FLINK_RUNNER_CONTEXT = + "flink_runner_context.create_flink_runner_context"; + private static final String CONVERT_TO_PYTHON_OBJECT = + "python_java_utils.convert_to_python_object"; + private static final String FLINK_RUNNER_CONTEXT_VAR_NAME = "flink_runner_context"; + + private final PythonEnvironmentManager environmentManager; + private final PythonRunnerContext runnerContext; + + private PythonInterpreter interpreter; + + public PythonActionExecutor(PythonEnvironmentManager environmentManager) { + this.environmentManager = environmentManager; + this.runnerContext = new PythonRunnerContext(); + } + + public void open() throws Exception { + environmentManager.open(); + EmbeddedPythonEnvironment env = environmentManager.createEnvironment(); + + interpreter = env.getInterpreter(); + interpreter.exec(PYTHON_IMPORTS); + + // TODO: remove the set and get runner context after updating pemja to version 0.5.3 + Object pythonRunnerContextObject = + interpreter.invoke(CREATE_FLINK_RUNNER_CONTEXT, runnerContext); + interpreter.set(FLINK_RUNNER_CONTEXT_VAR_NAME, pythonRunnerContextObject); + } + + public List<PythonEvent> executePythonFunction(PythonFunction function, PythonEvent event) + throws Exception { + runnerContext.checkNoPendingEvents(); + function.setInterpreter(interpreter); + + // TODO: remove the set and get runner context after updating pemja to version 0.5.3 + Object pythonRunnerContextObject = interpreter.get(FLINK_RUNNER_CONTEXT_VAR_NAME); + + Object pythonEventObject = interpreter.invoke(CONVERT_TO_PYTHON_OBJECT, event.getEvent()); + + try { + function.call(pythonEventObject, pythonRunnerContextObject); + } catch (Exception e) { + runnerContext.drainEvents(); + throw new PythonActionExecutionException("Failed to execute Python action", e); + } + + return runnerContext.drainEvents(); + } + + /** Failed to execute Python action. */ + public static class PythonActionExecutionException extends Exception { + public PythonActionExecutionException(String message, Throwable cause) { + super(message, cause); + } + } +} diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/PythonEvent.java b/runtime/src/main/java/org/apache/flink/agents/runtime/PythonEvent.java new file mode 100644 index 0000000..6bb277e --- /dev/null +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/PythonEvent.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.agents.runtime; + +import org.apache.flink.agents.api.Event; + +public class PythonEvent extends Event { + private final byte[] event; + private final String eventType; + + public PythonEvent(byte[] event, String eventType) { + super(); + this.event = event; + this.eventType = eventType; + } + + public byte[] getEvent() { + return event; + } + + public String getEventType() { + return eventType; + } +} diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/context/PythonRunnerContext.java b/runtime/src/main/java/org/apache/flink/agents/runtime/context/PythonRunnerContext.java new file mode 100644 index 0000000..e3e8e12 --- /dev/null +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/context/PythonRunnerContext.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.agents.runtime.context; + +import org.apache.flink.agents.runtime.PythonEvent; +import org.apache.flink.util.Preconditions; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.util.ArrayList; +import java.util.List; + +/** + * This class is used to manage the execution context when interacting with Python functions, + * including collecting new events generated during the execution of actions. + */ +@NotThreadSafe +public class PythonRunnerContext { + private final List<PythonEvent> pendingEvents; + + public PythonRunnerContext() { + this.pendingEvents = new ArrayList<>(); + } + + public void sendEvent(String type, byte[] event) { + this.pendingEvents.add(new PythonEvent(event, type)); + } + + public List<PythonEvent> drainEvents() { + List<PythonEvent> list = new ArrayList<>(this.pendingEvents); + this.pendingEvents.clear(); + return list; + } + + public void checkNoPendingEvents() { + Preconditions.checkState( + this.pendingEvents.isEmpty(), "There are pending events remaining in the context."); + } +} diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/env/PythonEnvironmentManager.java b/runtime/src/main/java/org/apache/flink/agents/runtime/env/PythonEnvironmentManager.java index 0e554c0..99429da 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/env/PythonEnvironmentManager.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/env/PythonEnvironmentManager.java @@ -21,7 +21,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.core.fs.Path; import org.apache.flink.python.env.AbstractPythonEnvironmentManager; import org.apache.flink.python.env.PythonDependencyInfo; -import org.apache.flink.python.env.PythonEnvironment; import pemja.core.PythonInterpreterConfig; import java.util.HashMap; @@ -43,7 +42,7 @@ public class PythonEnvironmentManager extends AbstractPythonEnvironmentManager { } @Override - public PythonEnvironment createEnvironment() { + public EmbeddedPythonEnvironment createEnvironment() { Map<String, String> env = new HashMap<>(getPythonEnv()); PythonInterpreterConfig.PythonInterpreterConfigBuilder interpreterConfigBuilder = diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/env/PythonEnvironmentManagerTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/env/PythonEnvironmentManagerTest.java index e90f4c4..d3f86aa 100644 --- a/runtime/src/test/java/org/apache/flink/agents/runtime/env/PythonEnvironmentManagerTest.java +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/env/PythonEnvironmentManagerTest.java @@ -151,8 +151,7 @@ public class PythonEnvironmentManagerTest { expected.put("python", "/usr/local/bin/python"); assertEquals(expected, environmentVariable); - EmbeddedPythonEnvironment env = - (EmbeddedPythonEnvironment) environmentManager.createEnvironment(); + EmbeddedPythonEnvironment env = environmentManager.createEnvironment(); assertEquals(expected, env.getEnv()); @@ -227,8 +226,7 @@ public class PythonEnvironmentManagerTest { File.separator, baseDir, PYTHON_FILES_DIR, "dir0", "test_dir")), false); - EmbeddedPythonEnvironment env = - (EmbeddedPythonEnvironment) environmentManager.createEnvironment(); + EmbeddedPythonEnvironment env = environmentManager.createEnvironment(); PythonInterpreterConfig config = env.getConfig(); assertArrayEquals(expectedPythonPath.split(File.pathSeparator), config.getPaths()); } @@ -265,8 +263,7 @@ public class PythonEnvironmentManagerTest { new File(String.join(File.separator, tmpBase, PYTHON_ARCHIVES_DIR, "py312")), true); - EmbeddedPythonEnvironment env = - (EmbeddedPythonEnvironment) environmentManager.createEnvironment(); + EmbeddedPythonEnvironment env = environmentManager.createEnvironment(); assertEquals(expected, env.getEnv());