This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push: new fd6ad26 [API][Python] Support DataStream / Table as input / output of Flink Agents in Python (#48) fd6ad26 is described below commit fd6ad26ce329bfe70d6a3a4a02ae9346fb6dbe48 Author: Wenjin Xie <166717626+wenjin...@users.noreply.github.com> AuthorDate: Fri Jul 11 16:49:59 2025 +0800 [API][Python] Support DataStream / Table as input / output of Flink Agents in Python (#48) --- .licenserc.yaml | 1 + python/flink_agents/api/event.py | 5 + python/flink_agents/api/execution_enviroment.py | 80 -------- python/flink_agents/api/execution_environment.py | 172 ++++++++++++++++ python/flink_agents/api/tests/test_event.py | 4 + python/flink_agents/examples/agent_example.py | 27 +-- .../integrate_datastream_with_agent_example.py | 89 ++++++++ .../examples/integrate_table_with_agent_example.py | 119 +++++++++++ python/flink_agents/examples/my_agent.py | 98 +++++++++ .../flink_agents/examples/resources/input_data.txt | 10 + python/flink_agents/plan/agent_plan.py | 6 +- python/flink_agents/plan/tests/test_agent_plan.py | 4 +- .../runtime/local_execution_environment.py | 110 ++++++++-- python/flink_agents/runtime/local_runner.py | 2 +- python/flink_agents/runtime/python_java_utils.py | 32 +-- .../runtime/remote_execution_environment.py | 223 +++++++++++++++++++++ .../tests/test_local_execution_environment.py | 13 +- python/requirements/test_requirements.txt | 1 + 18 files changed, 863 insertions(+), 133 deletions(-) diff --git a/.licenserc.yaml b/.licenserc.yaml index 673b1ac..0e0e64e 100644 --- a/.licenserc.yaml +++ b/.licenserc.yaml @@ -26,6 +26,7 @@ header: - 'NOTICE' - '.gitignore' - '**/*.json' + - '**/*.txt' comment: on-failure dependency: files: diff --git a/python/flink_agents/api/event.py b/python/flink_agents/api/event.py index 01084f5..4f85648 100644 --- a/python/flink_agents/api/event.py +++ b/python/flink_agents/api/event.py @@ -20,6 +20,7 @@ from typing import Any from uuid import UUID, uuid4 from pydantic import BaseModel, Field, model_validator +from pyflink.common import Row class Event(BaseModel, ABC, extra="allow"): @@ -36,6 +37,10 @@ class Event(BaseModel, ABC, extra="allow"): @model_validator(mode='after') def validate_extra(self) -> 'Event': """Ensure init fields is serializable.""" + #TODO: support Event contains Row field be json serializable + for value in self.model_dump().values(): + if isinstance(value, Row): + return self self.model_dump_json() return self diff --git a/python/flink_agents/api/execution_enviroment.py b/python/flink_agents/api/execution_enviroment.py deleted file mode 100644 index ebabc05..0000000 --- a/python/flink_agents/api/execution_enviroment.py +++ /dev/null @@ -1,80 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################# -import importlib -from abc import ABC, abstractmethod -from typing import Any, Dict, List - -from flink_agents.api.agent import Agent - - -class AgentsExecutionEnvironment(ABC): - """Base class for agent execution environment.""" - - @staticmethod - def get_execution_environment(**kwargs: Dict[str, Any]) -> 'AgentsExecutionEnvironment': - """Get agents execution environment. - - Currently, this method only returns LocalExecutionEnvironment. After - implement other AgentsExecutionEnvironments, this method will return - appropriate environment according to configuration. - - Returns: - ------- - AgentsExecutionEnvironment - Environment for agent execution. - """ - return importlib.import_module( - "flink_agents.runtime.local_execution_environment" - ).get_execution_environment(**kwargs) - - @abstractmethod - def from_list(self, input: List[Dict[str, Any]]) -> 'AgentsExecutionEnvironment': - """Set input for agents. Used for local execution. - - Parameters - ---------- - input : list - Receive a list as input. The element in the list should be a dict like - {'key': Any, 'value': Any} or {'value': Any} , extra field will be ignored. - """ - - @abstractmethod - def apply(self, agent: Agent) -> 'AgentsExecutionEnvironment': - """Set agent of execution environment. - - Parameters - ---------- - agent : Agent - The agent user defined to run in execution environment. - """ - - @abstractmethod - def to_list(self) -> List[Dict[str, Any]]: - """Get outputs of agent execution. Used for local execution. - - The element in the list is a dict like {'key': output}. - - Returns: - ------- - list - Outputs of agent execution. - """ - - @abstractmethod - def execute(self) -> None: - """Execute agents.""" diff --git a/python/flink_agents/api/execution_environment.py b/python/flink_agents/api/execution_environment.py new file mode 100644 index 0000000..e6d9efd --- /dev/null +++ b/python/flink_agents/api/execution_environment.py @@ -0,0 +1,172 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import importlib +from abc import ABC, abstractmethod +from typing import Any, Dict, List, Optional + +from pyflink.common import TypeInformation +from pyflink.datastream import DataStream, KeySelector, StreamExecutionEnvironment +from pyflink.table import Schema, StreamTableEnvironment, Table + +from flink_agents.api.agent import Agent + + +class AgentBuilder(ABC): + """Builder for integrating agent with input and output.""" + + @abstractmethod + def apply(self, agent: Agent) -> "AgentBuilder": + """Set agent of AgentBuilder. + + Parameters + ---------- + agent : Agent + The agent user defined to run in execution environment. + """ + + @abstractmethod + def to_list(self) -> List[Dict[str, Any]]: + """Get output list of agent execution. + + The element in the list is a dict like {'key': output}. + + Returns: + ------- + list + Outputs of agent execution. + """ + + @abstractmethod + def to_datastream(self) -> DataStream: + """Get output datastream of agent execution. + + Returns: + ------- + DataStream + Output datastream of agent execution. + """ + + # TODO: auto generate output_type. + @abstractmethod + def to_table(self, schema: Schema, output_type: TypeInformation) -> Table: + """Get output table of agent execution. + + Parameters + ---------- + schema : Schema + Indicate schema of the output table. + output_type : TypeInformation + Indicate schema corresponding type information. + + Returns: + ------- + Table + Output table of agent execution. + """ + + +class AgentsExecutionEnvironment(ABC): + """Base class for agent execution environment.""" + + @staticmethod + def get_execution_environment( + env: Optional[StreamExecutionEnvironment] = None, **kwargs: Dict[str, Any] + ) -> "AgentsExecutionEnvironment": + """Get agents execution environment. + + Currently, user can run flink agents in ide using LocalExecutionEnvironment or + RemoteExecutionEnvironment. To distinguish which environment to use, when run + flink agents with pyflink datastream/table, user should pass flink + StreamExecutionEnvironment when get AgentsExecutionEnvironment. + + Returns: + ------- + AgentsExecutionEnvironment + Environment for agent execution. + """ + if env is None: + return importlib.import_module( + "flink_agents.runtime.local_execution_environment" + ).create_instance(env=env, **kwargs) + else: + return importlib.import_module( + "flink_agents.runtime.remote_execution_environment" + ).create_instance(env=env, **kwargs) + + @abstractmethod + def from_list(self, input: List[Dict[str, Any]]) -> AgentBuilder: + """Set input for agents. Used for local execution. + + Parameters + ---------- + input : list + Receive a list as input. The element in the list should be a dict like + {'key': Any, 'value': Any} or {'value': Any} , extra field will be ignored. + + Returns: + ------- + AgentBuilder + A new builder to build an agent for specific input. + """ + + @abstractmethod + def from_datastream( + self, input: DataStream, key_selector: Optional[KeySelector] = None + ) -> AgentBuilder: + """Set input for agents. Used for remote execution. + + Parameters + ---------- + input : DataStream + Receive a DataStream as input. + key_selector : KeySelector + Extract key from each input record. + + Returns: + ------- + AgentBuilder + A new builder to build an agent for specific input. + """ + + @abstractmethod + def from_table( + self, + input: Table, + t_env: StreamTableEnvironment, + key_selector: Optional[KeySelector] = None, + ) -> AgentBuilder: + """Set input for agents. Used for remote execution. + + Parameters + ---------- + input : Table + Receive a Table as input. + t_env: StreamTableEnvironment + table environment supports convert Table to/from DataStream. + key_selector : KeySelector + Extract key from each input record. + + Returns: + ------- + AgentBuilder + A new builder to build an agent for specific input. + """ + + @abstractmethod + def execute(self) -> None: + """Execute agent individually.""" diff --git a/python/flink_agents/api/tests/test_event.py b/python/flink_agents/api/tests/test_event.py index ebc7821..eeed674 100644 --- a/python/flink_agents/api/tests/test_event.py +++ b/python/flink_agents/api/tests/test_event.py @@ -20,6 +20,7 @@ from typing import Type import pytest from pydantic import ValidationError from pydantic_core import PydanticSerializationError +from pyflink.common import Row from flink_agents.api.event import Event, InputEvent, OutputEvent @@ -40,3 +41,6 @@ def test_event_setattr_non_serializable() -> None: #noqa D103 with pytest.raises(PydanticSerializationError): event.c = Type[InputEvent] +def test_input_event_ignore_row_unserializable() -> None: #noqa D103 + InputEvent(input=Row({"a": 1})) + diff --git a/python/flink_agents/examples/agent_example.py b/python/flink_agents/examples/agent_example.py index 94f1319..ebc814c 100644 --- a/python/flink_agents/examples/agent_example.py +++ b/python/flink_agents/examples/agent_example.py @@ -17,36 +17,38 @@ ################################################################################# from typing import Any +from flink_agents.api.agent import Agent from flink_agents.api.decorators import action from flink_agents.api.event import Event, InputEvent, OutputEvent -from flink_agents.api.execution_enviroment import AgentsExecutionEnvironment +from flink_agents.api.execution_environment import AgentsExecutionEnvironment from flink_agents.api.runner_context import RunnerContext -from flink_agents.api.agent import Agent -class MyEvent(Event): #noqa D101 +class MyEvent(Event): # noqa D101 value: Any -#TODO: Replace this agent with more practical example. + +# TODO: Replace this agent with more practical example. class MyAgent(Agent): """An example of agent to show the basic usage. Currently, this agent doesn't really make sense, and it's mainly for developing validation. """ + @action(InputEvent) @staticmethod - def first_action(event: Event, ctx: RunnerContext): #noqa D102 + def first_action(event: Event, ctx: RunnerContext): # noqa D102 input = event.input - content = input + ' first_action' + content = input + " first_action" ctx.send_event(MyEvent(value=content)) ctx.send_event(OutputEvent(output=content)) @action(MyEvent) @staticmethod - def second_action(event: Event, ctx: RunnerContext): #noqa D102 + def second_action(event: Event, ctx: RunnerContext): # noqa D102 input = event.value - content = input + ' second_action' + content = input + " second_action" ctx.send_event(OutputEvent(output=content)) @@ -58,10 +60,11 @@ if __name__ == "__main__": output_list = env.from_list(input_list).apply(agent).to_list() - - input_list.append({'key': 'bob', 'value': 'The message from bob'}) - input_list.append({'k': 'john', 'v': 'The message from john'}) - input_list.append({'value': 'The message from unknown'}) # will automatically generate a new unique key + input_list.append({"key": "bob", "value": "The message from bob"}) + input_list.append({"k": "john", "v": "The message from john"}) + input_list.append( + {"value": "The message from unknown"} + ) # will automatically generate a new unique key env.execute() diff --git a/python/flink_agents/examples/integrate_datastream_with_agent_example.py b/python/flink_agents/examples/integrate_datastream_with_agent_example.py new file mode 100644 index 0000000..768e155 --- /dev/null +++ b/python/flink_agents/examples/integrate_datastream_with_agent_example.py @@ -0,0 +1,89 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +from pathlib import Path + +from pyflink.common import Duration, WatermarkStrategy +from pyflink.datastream import ( + KeySelector, + RuntimeExecutionMode, + StreamExecutionEnvironment, +) +from pyflink.datastream.connectors.file_system import FileSource, StreamFormat + +from flink_agents.api.execution_environment import AgentsExecutionEnvironment +from flink_agents.examples.my_agent import DataStreamAgent, ItemData + + +class MyKeySelector(KeySelector): + """KeySelector for extracting key.""" + + def get_key(self, value: ItemData) -> int: + """Extract key from ItemData.""" + return value.id + + +current_dir = Path(__file__).parent + +# if this example raises exception "No module named 'flink_agents'", you could set +# PYTHONPATH like "os.environ["PYTHONPATH"] = ($VENV_HOME/lib/$PYTHON_VERSION/ +# site-packages) in this file. +if __name__ == "__main__": + env = StreamExecutionEnvironment.get_execution_environment() + + # should compile flink-agents jars before run this example. + env.add_jars( + f"file:///{current_dir}/../../../runtime/target/flink-agents-runtime-0.1-SNAPSHOT.jar" + ) + env.add_jars( + f"file:///{current_dir}/../../../plan/target/flink-agents-plan-0.1-SNAPSHOT.jar" + ) + env.add_jars( + f"file:///{current_dir}/../../../api/target/flink-agents-api-0.1-SNAPSHOT.jar" + ) + + env.set_runtime_mode(RuntimeExecutionMode.STREAMING) + env.set_parallelism(1) + + # currently, bounded source is not supported due to runtime implementation, so + # we use continuous file source here. + input_datastream = env.from_source( + source=FileSource.for_record_stream_format( + StreamFormat.text_line_format(), f"file:///{current_dir}/resources" + ) + .monitor_continuously(Duration.of_minutes(1)) + .build(), + watermark_strategy=WatermarkStrategy.no_watermarks(), + source_name="streaming_agent_example", + ) + + deserialize_datastream = input_datastream.map( + lambda x: ItemData.model_validate_json(x) + ) + + agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env) + output_datastream = ( + agents_env.from_datastream( + input=deserialize_datastream, key_selector=MyKeySelector() + ) + .apply(DataStreamAgent()) + .to_datastream() + ) + + output_datastream.print() + + agents_env.execute() diff --git a/python/flink_agents/examples/integrate_table_with_agent_example.py b/python/flink_agents/examples/integrate_table_with_agent_example.py new file mode 100644 index 0000000..07b60c0 --- /dev/null +++ b/python/flink_agents/examples/integrate_table_with_agent_example.py @@ -0,0 +1,119 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +from pathlib import Path + +from pyflink.common import Row +from pyflink.common.typeinfo import BasicTypeInfo, ExternalTypeInfo, RowTypeInfo +from pyflink.datastream import ( + KeySelector, + RuntimeExecutionMode, + StreamExecutionEnvironment, +) +from pyflink.table import ( + DataTypes, + Schema, + StreamTableEnvironment, + TableDescriptor, +) + +from flink_agents.api.execution_environment import AgentsExecutionEnvironment +from flink_agents.examples.my_agent import TableAgent + +current_dir = Path(__file__).parent + + +class MyKeySelector(KeySelector): + """KeySelector for extracting key.""" + + def get_key(self, value: Row) -> int: + """Extract key from Row.""" + return value[0] + + +# if this example raises exception "No module named 'flink_agents'", you could set +# PYTHONPATH like "os.environ["PYTHONPATH"] = ($VENV_HOME/lib/$PYTHON_VERSION/ +# site-packages) in this file. +if __name__ == "__main__": + env = StreamExecutionEnvironment.get_execution_environment() + + # should compile flink-agents jars before run this example. + env.add_jars( + f"file:///{current_dir}/../../../runtime/target/flink-agents-runtime-0.1-SNAPSHOT.jar" + ) + env.add_jars( + f"file:///{current_dir}/../../../plan/target/flink-agents-plan-0.1-SNAPSHOT.jar" + ) + env.add_jars( + f"file:///{current_dir}/../../../api/target/flink-agents-api-0.1-SNAPSHOT.jar" + ) + + env.set_runtime_mode(RuntimeExecutionMode.STREAMING) + env.set_parallelism(1) + + t_env = StreamTableEnvironment.create(stream_execution_environment=env) + + t_env.create_temporary_table( + "source", + TableDescriptor.for_connector("filesystem") + .schema( + Schema.new_builder() + .column("id", DataTypes.BIGINT()) + .column("review", DataTypes.STRING()) + .column("review_score", DataTypes.FLOAT()) + .build() + ) + .option("format", "json") + .option("path", f"file:///{current_dir}/resources") + .option("source.monitor-interval", "60s") + .build(), + ) + + table = t_env.from_path("source") + + agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env) + + output_type = ExternalTypeInfo( + RowTypeInfo( + [ + BasicTypeInfo.LONG_TYPE_INFO(), + BasicTypeInfo.STRING_TYPE_INFO(), + BasicTypeInfo.FLOAT_TYPE_INFO(), + ], + ["id", "review", "review_score"], + ) + ) + + schema = ( + Schema.new_builder() + .column("id", DataTypes.BIGINT()) + .column("review", DataTypes.STRING()) + .column("review_score", DataTypes.FLOAT()) + ).build() + + output_table = ( + agents_env.from_table(input=table, t_env=t_env, key_selector=MyKeySelector()) + .apply(TableAgent()) + .to_table(schema=schema, output_type=output_type) + ) + + t_env.create_temporary_table( + "sink", + TableDescriptor.for_connector("print").schema(schema).build(), + ) + + output_table.execute_insert("sink").wait() diff --git a/python/flink_agents/examples/my_agent.py b/python/flink_agents/examples/my_agent.py new file mode 100644 index 0000000..0c982fa --- /dev/null +++ b/python/flink_agents/examples/my_agent.py @@ -0,0 +1,98 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import copy +from typing import Any + +from pydantic import BaseModel + +from flink_agents.api.agent import Agent +from flink_agents.api.decorators import action +from flink_agents.api.event import Event, InputEvent, OutputEvent +from flink_agents.api.runner_context import RunnerContext + + +class ItemData(BaseModel): + """Data model for storing item information. + + Attributes: + ---------- + id : int + Unique identifier of the item + review : str + The user review of the item + review_score: float + The review_score of the item + """ + + id: int + review: str + review_score: float + + +class MyEvent(Event): # noqa D101 + value: Any + + +class DataStreamAgent(Agent): + """Agent used for explaining integrating agents with DataStream. + + Because pemja will find action in this class when execute Agent, we can't + define this class directly in example.py for module name will be set + to __main__. + """ + + @action(InputEvent) + @staticmethod + def first_action(event: Event, ctx: RunnerContext): # noqa D102 + input = event.input + content = copy.deepcopy(input) + content.review += " first action" + ctx.send_event(MyEvent(value=content)) + + @action(MyEvent) + @staticmethod + def second_action(event: Event, ctx: RunnerContext): # noqa D102 + input = event.value + content = copy.deepcopy(input) + content.review += " second action" + ctx.send_event(OutputEvent(output=content)) + + +class TableAgent(Agent): + """Agent used for explaining integrating agents with Table. + + Because pemja will find action in this class when execute Agent, we can't + define this class directly in example.py for module name will be set + to __main__. + """ + + @action(InputEvent) + @staticmethod + def first_action(event: Event, ctx: RunnerContext): # noqa D102 + input = event.input + content = input + content["review"] += " first action" + ctx.send_event(MyEvent(value=content)) + + @action(MyEvent) + @staticmethod + def second_action(event: Event, ctx: RunnerContext): # noqa D102 + input = event.value + content = input + content["review"] += " second action" + ctx.send_event(OutputEvent(output=content)) diff --git a/python/flink_agents/examples/resources/input_data.txt b/python/flink_agents/examples/resources/input_data.txt new file mode 100644 index 0000000..13efe8e --- /dev/null +++ b/python/flink_agents/examples/resources/input_data.txt @@ -0,0 +1,10 @@ +{"id":1,"review":"Great product! Works perfectly and lasts a long time.","review_score":3.0} +{"id":2,"review":"The item arrived damaged, and the packaging was poor.","review_score":5.0} +{"id":3,"review":"Highly satisfied with the performance and value for money.","review_score":7.0} +{"id":3,"review":"Not as good as expected. It stopped working after a week.","review_score":8.0} +{"id":1,"review":"Fast shipping and excellent customer service. Would buy again!","review_score":8.0} +{"id":2,"review":"Too complicated to set up. Instructions were unclear.","review_score":8.0} +{"id":2,"review":"Good quality, but overview_scored for what it does.","review_score":8.0} +{"id":1,"review":"Exactly what I needed. Easy to use and very reliable.","review_score":8.0} +{"id":2,"review":"Worst purchase ever. Waste of money and time.","review_score":8.0} +{"id":2,"review":"Looks nice and functions well, but could be more durable.","review_score":8.0} \ No newline at end of file diff --git a/python/flink_agents/plan/agent_plan.py b/python/flink_agents/plan/agent_plan.py index 5ea6197..e37d3a8 100644 --- a/python/flink_agents/plan/agent_plan.py +++ b/python/flink_agents/plan/agent_plan.py @@ -15,11 +15,13 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################# +from typing import Dict, List + +from pydantic import BaseModel + from flink_agents.api.agent import Agent from flink_agents.plan.action import Action from flink_agents.plan.function import PythonFunction -from pydantic import BaseModel -from typing import Dict, List class AgentPlan(BaseModel): diff --git a/python/flink_agents/plan/tests/test_agent_plan.py b/python/flink_agents/plan/tests/test_agent_plan.py index a268770..6912044 100644 --- a/python/flink_agents/plan/tests/test_agent_plan.py +++ b/python/flink_agents/plan/tests/test_agent_plan.py @@ -20,12 +20,12 @@ from pathlib import Path import pytest +from flink_agents.api.agent import Agent from flink_agents.api.decorators import action from flink_agents.api.event import Event, InputEvent, OutputEvent from flink_agents.api.runner_context import RunnerContext -from flink_agents.api.agent import Agent -from flink_agents.plan.function import PythonFunction from flink_agents.plan.agent_plan import AgentPlan +from flink_agents.plan.function import PythonFunction class TestAgent(Agent): #noqa D101 diff --git a/python/flink_agents/runtime/local_execution_environment.py b/python/flink_agents/runtime/local_execution_environment.py index 119354a..e2865f1 100644 --- a/python/flink_agents/runtime/local_execution_environment.py +++ b/python/flink_agents/runtime/local_execution_environment.py @@ -17,50 +17,97 @@ ################################################################################# from typing import Any, Dict, List -from flink_agents.api.execution_enviroment import AgentsExecutionEnvironment +from pyflink.common import TypeInformation +from pyflink.datastream import DataStream, KeySelector, StreamExecutionEnvironment +from pyflink.table import Schema, StreamTableEnvironment, Table + from flink_agents.api.agent import Agent +from flink_agents.api.execution_environment import ( + AgentBuilder, + AgentsExecutionEnvironment, +) from flink_agents.runtime.local_runner import LocalRunner -class LocalExecutionEnvironment(AgentsExecutionEnvironment): - """Implementation of AgentsExecutionEnvironment for local execution environment.""" +class LocalAgentBuilder(AgentBuilder): + """LocalAgentBuilder for building agent instance.""" + __env: "LocalExecutionEnvironment" __input: List[Dict[str, Any]] __output: List[Any] __runner: LocalRunner = None __executed: bool = False - def __init__(self) -> None: + def __init__( + self, env: "LocalExecutionEnvironment", input: List[Dict[str, Any]] + ) -> None: """Init empty output list.""" - self.__output = [] - - def from_list(self, input: list) -> 'AgentsExecutionEnvironment': - """Set input list of execution environment.""" + self.__env = env self.__input = input - return self + self.__output = [] - def apply(self, agent: Agent) -> 'AgentsExecutionEnvironment': + def apply(self, agent: Agent) -> AgentBuilder: """Create local runner to execute given agent. - Doesn't support apply multiple agents. + Doesn't support apply multiple Agents. """ if self.__runner is not None: - err_msg = "LocalExecutionEnvironment doesn't support apply multiple agents." + err_msg = "LocalAgentBuilder doesn't support apply multiple agents." raise RuntimeError(err_msg) self.__runner = LocalRunner(agent) + self.__env.set_agent(self.__input, self.__output, self.__runner) return self - def to_list(self) -> list: + def to_list(self) -> List[Dict[str, Any]]: """Get output list of execution environment.""" return self.__output - def execute(self) -> None: - """Execute agents. + def to_datastream(self) -> DataStream: + """Get output DataStream of agent execution. - Doesn't support execute multiple times. + This method is not supported for LocalAgentBuilder. """ + msg = "LocalAgentBuilder does not support to_datastream." + raise NotImplementedError(msg) + + def to_table(self, schema: Schema, output_type: TypeInformation) -> Table: + """Get output Table of agent execution. + + This method is not supported for LocalAgentBuilder. + """ + msg = "LocalAgentBuilder does not support to_table." + raise NotImplementedError(msg) + + +class LocalExecutionEnvironment(AgentsExecutionEnvironment): + """Implementation of AgentsExecutionEnvironment for local execution environment.""" + + __input: List[Dict[str, Any]] = None + __output: List[Any] = None + __runner: LocalRunner = None + __executed: bool = False + + def from_list(self, input: list) -> LocalAgentBuilder: + """Set input list of execution environment.""" + if self.__input is not None: + err_msg = "LocalExecutionEnvironment doesn't support call from_list multiple times." + raise RuntimeError(err_msg) + + self.__input = input + return LocalAgentBuilder(env=self, input=input) + + def set_agent(self, input: list, output: list, runner: LocalRunner) -> None: + """Set agent input, output and runner.""" + self.__input = input + self.__runner = runner + self.__output = output + + def execute(self) -> None: + """Execute agent individually.""" if self.__executed: - err_msg = "LocalExecutionEnvironment doesn't support execute multiple times." + err_msg = ( + "LocalExecutionEnvironment doesn't support execute multiple times." + ) raise RuntimeError(err_msg) self.__executed = True for input in self.__input: @@ -69,12 +116,37 @@ class LocalExecutionEnvironment(AgentsExecutionEnvironment): for output in outputs: self.__output.append(output) + def from_datastream( + self, input: DataStream, key_selector: KeySelector = None + ) -> AgentBuilder: + """Set input DataStream of agent execution. + + This method is not supported for local execution environments. + """ + msg = "LocalExecutionEnvironment does not support from_datastream." + raise NotImplementedError(msg) + + def from_table( + self, + input: Table, + t_env: StreamTableEnvironment, + key_selector: KeySelector = None, + ) -> AgentBuilder: + """Set input Table of agent execution. + + This method is not supported for local execution environments. + """ + msg = "LocalExecutionEnvironment does not support from_table." + raise NotImplementedError(msg) + -def get_execution_environment(**kwargs: Dict[str, Any]) -> AgentsExecutionEnvironment: - """Factory function to create a local agents execution environment. +def create_instance(env: StreamExecutionEnvironment, **kwargs: Dict[str, Any]) -> AgentsExecutionEnvironment: + """Factory function to create a remote agents execution environment. Parameters ---------- + env : StreamExecutionEnvironment + Flink job execution environment. **kwargs : Dict[str, Any] The dict of parameters to configure the execution environment. diff --git a/python/flink_agents/runtime/local_runner.py b/python/flink_agents/runtime/local_runner.py index 8149c2d..dfbb582 100644 --- a/python/flink_agents/runtime/local_runner.py +++ b/python/flink_agents/runtime/local_runner.py @@ -22,9 +22,9 @@ from typing import Any, Dict, List from typing_extensions import override +from flink_agents.api.agent import Agent from flink_agents.api.event import Event, InputEvent, OutputEvent from flink_agents.api.runner_context import RunnerContext -from flink_agents.api.agent import Agent from flink_agents.plan.agent_plan import AgentPlan from flink_agents.runtime.agent_runner import AgentRunner diff --git a/python/flink_agents/runtime/python_java_utils.py b/python/flink_agents/runtime/python_java_utils.py index 0abd160..a4bf1b8 100644 --- a/python/flink_agents/runtime/python_java_utils.py +++ b/python/flink_agents/runtime/python_java_utils.py @@ -1,22 +1,24 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# from typing import Any import cloudpickle + from flink_agents.api.event import InputEvent @@ -30,4 +32,4 @@ def wrap_to_input_event(bytesObject: bytes) -> bytes: def get_output_from_output_event(bytesObject: bytes) -> Any: """Get output data from OutputEvent and serialize.""" - return cloudpickle.dumps(convert_to_python_object(bytesObject).output) \ No newline at end of file + return cloudpickle.dumps(convert_to_python_object(bytesObject).output) diff --git a/python/flink_agents/runtime/remote_execution_environment.py b/python/flink_agents/runtime/remote_execution_environment.py new file mode 100644 index 0000000..3f6aae1 --- /dev/null +++ b/python/flink_agents/runtime/remote_execution_environment.py @@ -0,0 +1,223 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +from typing import Any, Dict, List, Optional + +import cloudpickle +from pyflink.common import TypeInformation +from pyflink.common.typeinfo import ( + PickledBytesTypeInfo, +) +from pyflink.datastream import ( + DataStream, + KeyedStream, + KeySelector, + StreamExecutionEnvironment, +) +from pyflink.table import Schema, StreamTableEnvironment, Table +from pyflink.util.java_utils import invoke_method + +from flink_agents.api.agent import Agent +from flink_agents.api.execution_environment import ( + AgentBuilder, + AgentsExecutionEnvironment, +) +from flink_agents.plan.agent_plan import AgentPlan + + +class RemoteAgentBuilder(AgentBuilder): + """RemoteAgentBuilder for integrating datastream/table and agent.""" + + __input: DataStream + __agent_plan: AgentPlan = None + __output: DataStream = None + __t_env: StreamTableEnvironment + + def __init__( + self, input: DataStream, t_env: Optional[StreamTableEnvironment] = None + ) -> None: + """Init method of RemoteAgentBuilder.""" + self.__input = input + self.__t_env = t_env + + def apply(self, agent: Agent) -> "AgentBuilder": + """Set agent of execution environment. + + Parameters + ---------- + agent : Agent + The agent user defined to run in execution environment. + """ + if self.__agent_plan is not None: + err_msg = "RemoteAgentBuilder doesn't support apply multiple agents yet." + raise RuntimeError(err_msg) + self.__agent_plan = AgentPlan.from_agent(agent) + return self + + def to_datastream( + self, output_type: Optional[TypeInformation] = None + ) -> DataStream: + """Get output datastream of agent execution. + + Returns: + ------- + DataStream + Output datastream of agent execution. + """ + if self.__agent_plan is None: + err_msg = "Must apply agent before call to_datastream/to_table." + raise RuntimeError(err_msg) + + # return the same output datastream when call to_datastream multiple. + if self.__output is None: + j_data_stream_output = invoke_method( + None, + "org.apache.flink.agents.runtime.CompileUtils", + "connectToAgent", + [ + self.__input._j_data_stream, + self.__agent_plan.model_dump_json(serialize_as_any=True), + ], + [ + "org.apache.flink.streaming.api.datastream.KeyedStream", + "java.lang.String", + ], + ) + output_stream = DataStream(j_data_stream_output) + self.__output = output_stream.map( + lambda x: cloudpickle.loads(x), output_type=output_type + ) + return self.__output + + def to_table(self, schema: Schema, output_type: TypeInformation) -> Table: + """Get output Table of agent execution. + + Parameters + ---------- + schema : Schema + Indicate schema of the output table. + output_type : TypeInformation + Indicate schema corresponding type information. + + Returns: + ------- + Table + Output Table of agent execution. + """ + return self.__t_env.from_data_stream(self.to_datastream(output_type), schema) + + def to_list(self) -> List[Dict[str, Any]]: + """Get output list of agent execution. + + This method is not supported for remote execution environments. + """ + msg = "RemoteAgentBuilder does not support to_list." + raise NotImplementedError(msg) + + +class RemoteExecutionEnvironment(AgentsExecutionEnvironment): + """Implementation of AgentsExecutionEnvironment for execution with DataStream.""" + + __env: StreamExecutionEnvironment + + def __init__(self, env: StreamExecutionEnvironment) -> None: + """Init method of RemoteExecutionEnvironment.""" + self.__env = env + + @staticmethod + def __process_input_datastream( + input: DataStream, key_selector: Optional[KeySelector] = None + ) -> KeyedStream: + if isinstance(input, KeyedStream): + return input + else: + if key_selector is None: + msg = "KeySelector must be provided." + raise RuntimeError(msg) + input = input.key_by(key_selector) + return input + + def from_datastream( + self, input: DataStream, key_selector: KeySelector = None + ) -> RemoteAgentBuilder: + """Set input datastream of agent. + + Parameters + ---------- + input : DataStream + Receive a DataStream as input. + key_selector : KeySelector + Extract key from each input record, must not be None when input is + not KeyedStream. + """ + input = self.__process_input_datastream(input, key_selector) + + return RemoteAgentBuilder(input=input) + + def from_table( + self, + input: Table, + t_env: StreamTableEnvironment, + key_selector: Optional[KeySelector] = None, + ) -> AgentBuilder: + """Set input Table of agent. + + Parameters + ---------- + input : Table + Receive a Table as input. + t_env: StreamTableEnvironment + table environment supports convert table to/from datastream. + key_selector : KeySelector + Extract key from each input record. + """ + input = t_env.to_data_stream(table=input) + + input = input.map(lambda x: x, output_type=PickledBytesTypeInfo()) + + input = self.__process_input_datastream(input, key_selector) + return RemoteAgentBuilder(input=input, t_env=t_env) + + def from_list(self, input: List[Dict[str, Any]]) -> "AgentsExecutionEnvironment": + """Set input list of agent execution. + + This method is not supported for remote execution environments. + """ + msg = "RemoteExecutionEnvironment does not support from_list." + raise NotImplementedError(msg) + + def execute(self) -> None: + """Execute agent.""" + self.__env.execute() + + +def create_instance(env: StreamExecutionEnvironment, **kwargs: Dict[str, Any]) -> AgentsExecutionEnvironment: + """Factory function to create a remote agents execution environment. + + Parameters + ---------- + env : StreamExecutionEnvironment + Flink job execution environment. + **kwargs : Dict[str, Any] + The dict of parameters to configure the execution environment. + + Returns: + ------- + AgentsExecutionEnvironment + A configured agents execution environment instance. + """ + return RemoteExecutionEnvironment(env=env) diff --git a/python/flink_agents/runtime/tests/test_local_execution_environment.py b/python/flink_agents/runtime/tests/test_local_execution_environment.py index 6857f3e..4aaf521 100644 --- a/python/flink_agents/runtime/tests/test_local_execution_environment.py +++ b/python/flink_agents/runtime/tests/test_local_execution_environment.py @@ -17,11 +17,11 @@ ################################################################################# import pytest +from flink_agents.api.agent import Agent from flink_agents.api.decorators import action from flink_agents.api.event import Event, InputEvent, OutputEvent -from flink_agents.api.execution_enviroment import AgentsExecutionEnvironment +from flink_agents.api.execution_environment import AgentsExecutionEnvironment from flink_agents.api.runner_context import RunnerContext -from flink_agents.api.agent import Agent class TestAgent1(Agent): # noqa: D101 @@ -80,3 +80,12 @@ def test_local_execution_environment_execute_multi_times() -> None: # noqa: D103 env.execute() with pytest.raises(RuntimeError): env.execute() + +def test_local_execution_environment_call_from_list_twice() -> None: # noqa: D103 + env = AgentsExecutionEnvironment.get_execution_environment() + + input_list = [] + + env.from_list(input_list) + with pytest.raises(RuntimeError): + env.from_list(input_list) diff --git a/python/requirements/test_requirements.txt b/python/requirements/test_requirements.txt index 11ddd75..52a14f7 100644 --- a/python/requirements/test_requirements.txt +++ b/python/requirements/test_requirements.txt @@ -16,3 +16,4 @@ wheel setuptools>=75.3 pytest==8.4.0 pydantic==2.11.4 +apache-flink==1.20.1