This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push: new b55ca1c [plan][runtime][python] Introduce Resource and Resource Provider in python (#70) b55ca1c is described below commit b55ca1c00e8a056255a573bf6251a701592739e5 Author: Wenjin Xie <166717626+wenjin...@users.noreply.github.com> AuthorDate: Fri Jul 18 17:50:47 2025 +0800 [plan][runtime][python] Introduce Resource and Resource Provider in python (#70) --- .../{runner_context.py => chat_models/__init__.py} | 20 --- .../chat_model.py} | 26 ++- python/flink_agents/api/decorators.py | 40 ++++- python/flink_agents/api/resource.py | 68 ++++++++ python/flink_agents/api/runner_context.py | 13 ++ .../api/{runner_context.py => tools/__init__.py} | 20 --- .../api/{runner_context.py => tools/tool.py} | 26 ++- python/flink_agents/plan/agent_plan.py | 180 +++++++++++++++++++-- python/flink_agents/plan/resource_provider.py | 143 ++++++++++++++++ .../plan/tests/resources/agent_plan.json | 16 ++ .../plan/tests/resources/resource_provider.json | 10 ++ python/flink_agents/plan/tests/test_agent_plan.py | 69 ++++++-- .../plan/tests/test_resource_provider.py | 68 ++++++++ .../runner_context.py => plan/tools/__init__.py} | 20 --- .../tools/function_tool.py} | 26 ++- .../flink_agents/runtime/flink_runner_context.py | 17 +- python/flink_agents/runtime/local_runner.py | 5 + .../runtime/tests/test_get_resource_in_action.py | 93 +++++++++++ .../runtime/operator/ActionExecutionOperator.java | 6 +- .../runtime/python/utils/PythonActionExecutor.java | 6 +- 20 files changed, 733 insertions(+), 139 deletions(-) diff --git a/python/flink_agents/api/runner_context.py b/python/flink_agents/api/chat_models/__init__.py similarity index 66% copy from python/flink_agents/api/runner_context.py copy to python/flink_agents/api/chat_models/__init__.py index 05b053d..e154fad 100644 --- a/python/flink_agents/api/runner_context.py +++ b/python/flink_agents/api/chat_models/__init__.py @@ -15,23 +15,3 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################# -from abc import ABC, abstractmethod - -from flink_agents.api.event import Event - - -class RunnerContext(ABC): - """Abstract base class providing context for agent execution. - - This context provides access to event handling. - """ - - @abstractmethod - def send_event(self, event: Event) -> None: - """Send an event to the agent for processing. - - Parameters - ---------- - event : Event - The event to be processed by the agent system. - """ diff --git a/python/flink_agents/api/runner_context.py b/python/flink_agents/api/chat_models/chat_model.py similarity index 67% copy from python/flink_agents/api/runner_context.py copy to python/flink_agents/api/chat_models/chat_model.py index 05b053d..b2fb985 100644 --- a/python/flink_agents/api/runner_context.py +++ b/python/flink_agents/api/chat_models/chat_model.py @@ -15,23 +15,21 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################# -from abc import ABC, abstractmethod +from abc import ABC -from flink_agents.api.event import Event +from typing_extensions import override +from flink_agents.api.resource import Resource, ResourceType -class RunnerContext(ABC): - """Abstract base class providing context for agent execution. - This context provides access to event handling. - """ +#TODO: Complete BaseChatModel +class BaseChatModel(Resource, ABC): + """Base abstract class of all kinds of chat models. - @abstractmethod - def send_event(self, event: Event) -> None: - """Send an event to the agent for processing. + Currently, this class is empty just for testing purposes. + """ - Parameters - ---------- - event : Event - The event to be processed by the agent system. - """ + @classmethod + @override + def resource_type(cls) -> ResourceType: + return ResourceType.CHAT_MODEL diff --git a/python/flink_agents/api/decorators.py b/python/flink_agents/api/decorators.py index 6a0b8b8..1d8e3bb 100644 --- a/python/flink_agents/api/decorators.py +++ b/python/flink_agents/api/decorators.py @@ -38,13 +38,49 @@ def action(*listen_events: Tuple[Type[Event], ...]) -> Callable: AssertionError If no events are provided to listen to. """ - assert len(listen_events) > 0, 'action must have at least one event type to listen to' + assert len(listen_events) > 0, ( + "action must have at least one event type to listen to" + ) for event in listen_events: - assert issubclass(event, Event), 'action must only listen to event types.' + assert issubclass(event, Event), "action must only listen to event types." def decorator(func: Callable) -> Callable: func._listen_events = listen_events return func return decorator + + +def chat_model(func: Callable) -> Callable: + """Decorator for marking a function declaring a chat model. + + Parameters + ---------- + func : Callable + Function to be decorated. + + Returns: + ------- + Callable + Decorator function that marks the target function declare a chat model. + """ + func._is_chat_model = True + return func + + +def tool(func: Callable) -> Callable: + """Decorator for marking a function declaring a chat model. + + Parameters + ---------- + func : Callable + Function to be decorated. + + Returns: + ------- + Callable + Decorator function that marks the target function declare a tools. + """ + func._is_tool = True + return func diff --git a/python/flink_agents/api/resource.py b/python/flink_agents/api/resource.py new file mode 100644 index 0000000..30b440d --- /dev/null +++ b/python/flink_agents/api/resource.py @@ -0,0 +1,68 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +from abc import ABC, abstractmethod +from enum import Enum + +from pydantic import BaseModel, model_validator + + +class ResourceType(Enum): + """Type enum of resource. + + Currently, only support chat_models and tools. + """ + + CHAT_MODEL = "chat_model" + TOOL = "tool" + # EMBEDDING_MODEL = "embedding_model" + # PROMPT = "prompt" + # VECTOR_STORE = "vector_store" + # MCP_SERVER = "mcp_server" + + +class Resource(BaseModel, ABC): + """Base abstract class of all kinds of resources, includes chat model, + prompt, tools and so on. + + Resource extends BaseModel only for decreasing the complexity of attribute + declaration of subclasses, this not represents Resource object is serializable. + + Attributes: + ---------- + name : str + The name of the resource. + type : ResourceType + The type of the resource. + """ + + name: str + + @classmethod + @abstractmethod + def resource_type(cls) -> ResourceType: + """Return resource type of class.""" + + +class SerializableResource(Resource, ABC): + """Resource which is serializable.""" + + @model_validator(mode="after") + def validate_serializable(self) -> "SerializableResource": + """Ensure resource is serializable.""" + self.model_dump_json() + return self diff --git a/python/flink_agents/api/runner_context.py b/python/flink_agents/api/runner_context.py index 05b053d..f36472a 100644 --- a/python/flink_agents/api/runner_context.py +++ b/python/flink_agents/api/runner_context.py @@ -18,6 +18,7 @@ from abc import ABC, abstractmethod from flink_agents.api.event import Event +from flink_agents.api.resource import Resource, ResourceType class RunnerContext(ABC): @@ -35,3 +36,15 @@ class RunnerContext(ABC): event : Event The event to be processed by the agent system. """ + + @abstractmethod + def get_resource(self, name: str, type: ResourceType) -> Resource: + """Get resource from context. + + Parameters + ---------- + name : str + The name of the resource. + type : ResourceType + The type of the resource. + """ diff --git a/python/flink_agents/api/runner_context.py b/python/flink_agents/api/tools/__init__.py similarity index 66% copy from python/flink_agents/api/runner_context.py copy to python/flink_agents/api/tools/__init__.py index 05b053d..e154fad 100644 --- a/python/flink_agents/api/runner_context.py +++ b/python/flink_agents/api/tools/__init__.py @@ -15,23 +15,3 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################# -from abc import ABC, abstractmethod - -from flink_agents.api.event import Event - - -class RunnerContext(ABC): - """Abstract base class providing context for agent execution. - - This context provides access to event handling. - """ - - @abstractmethod - def send_event(self, event: Event) -> None: - """Send an event to the agent for processing. - - Parameters - ---------- - event : Event - The event to be processed by the agent system. - """ diff --git a/python/flink_agents/api/runner_context.py b/python/flink_agents/api/tools/tool.py similarity index 67% copy from python/flink_agents/api/runner_context.py copy to python/flink_agents/api/tools/tool.py index 05b053d..566fa10 100644 --- a/python/flink_agents/api/runner_context.py +++ b/python/flink_agents/api/tools/tool.py @@ -15,23 +15,21 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################# -from abc import ABC, abstractmethod +from abc import ABC -from flink_agents.api.event import Event +from typing_extensions import override +from flink_agents.api.resource import ResourceType, SerializableResource -class RunnerContext(ABC): - """Abstract base class providing context for agent execution. - This context provides access to event handling. - """ +#TODO: Complete BaseTool +class BaseTool(SerializableResource, ABC): + """Base abstract class of all kinds of tools. - @abstractmethod - def send_event(self, event: Event) -> None: - """Send an event to the agent for processing. + Currently, this class is empty just for testing purposes + """ - Parameters - ---------- - event : Event - The event to be processed by the agent system. - """ + @classmethod + @override + def resource_type(cls) -> ResourceType: + return ResourceType.TOOL diff --git a/python/flink_agents/plan/agent_plan.py b/python/flink_agents/plan/agent_plan.py index e37d3a8..cb77f09 100644 --- a/python/flink_agents/plan/agent_plan.py +++ b/python/flink_agents/plan/agent_plan.py @@ -15,13 +15,22 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################# -from typing import Dict, List +from typing import Dict, List, Optional -from pydantic import BaseModel +from pydantic import BaseModel, field_serializer, model_validator from flink_agents.api.agent import Agent +from flink_agents.api.resource import Resource, ResourceType from flink_agents.plan.action import Action from flink_agents.plan.function import PythonFunction +from flink_agents.plan.resource_provider import ( + JavaResourceProvider, + JavaSerializableResourceProvider, + PythonResourceProvider, + PythonSerializableResourceProvider, + ResourceProvider, +) +from flink_agents.plan.tools.function_tool import FunctionTool class AgentPlan(BaseModel): @@ -33,9 +42,74 @@ class AgentPlan(BaseModel): Mapping of action names to actions actions_by_event : Dict[Type[Event], str] Mapping of event types to the list of actions name that listen to them. + resource_providers: ResourceProvider + Two level mapping of resource type to resource name to resource provider. """ + actions: Dict[str, Action] actions_by_event: Dict[str, List[str]] + resource_providers: Optional[Dict[ResourceType, Dict[str, ResourceProvider]]] = None + __resources: Dict[ResourceType, Dict[str, Resource]] = {} + + @field_serializer("resource_providers") + def __serialize_resource_providers( + self, providers: Dict[ResourceType, Dict[str, ResourceProvider]] + ) -> dict: + # append meta info to help deserialize resource providers + data = {} + for type in providers: + data[type] = {} + for name, provider in providers[type].items(): + data[type][name] = provider.model_dump() + if isinstance(provider, PythonResourceProvider): + data[type][name]["__resource_provider_type__"] = ( + "PythonResourceProvider" + ) + elif isinstance(provider, PythonSerializableResourceProvider): + data[type][name]["__resource_provider_type__"] = ( + "PythonSerializableResourceProvider" + ) + elif isinstance(provider, JavaResourceProvider): + data[type][name]["__resource_provider_type__"] = ( + "JavaResourceProvider" + ) + elif isinstance(provider, JavaSerializableResourceProvider): + data[type][name]["__resource_provider_type__"] = ( + "JavaSerializableResourceProvider" + ) + return data + + @model_validator(mode="before") + def __custom_deserialize(self) -> "AgentPlan": + if "resource_providers" in self: + providers = self["resource_providers"] + # restore exec from serialized json. + if isinstance(providers, dict): + for type in providers: + for name, provider in providers[type].items(): + if isinstance(provider, dict): + provider_type = provider["__resource_provider_type__"] + if provider_type == "PythonResourceProvider": + self["resource_providers"][type][name] = ( + PythonResourceProvider.model_validate(provider) + ) + elif provider_type == "PythonSerializableResourceProvider": + self["resource_providers"][type][name] = ( + PythonSerializableResourceProvider.model_validate( + provider + ) + ) + elif provider_type == "JavaResourceProvider": + self["resource_providers"][type][name] = ( + JavaResourceProvider.model_validate(provider) + ) + elif provider_type == "JavaSerializableResourceProvider": + self["resource_providers"][type][name] = ( + JavaSerializableResourceProvider.model_validate( + provider + ) + ) + return self @staticmethod def from_agent(agent: Agent) -> "AgentPlan": @@ -49,7 +123,22 @@ class AgentPlan(BaseModel): if event_type not in actions_by_event: actions_by_event[event_type] = [] actions_by_event[event_type].append(action.name) - return AgentPlan(actions=actions, actions_by_event=actions_by_event) + + resource_providers = {} + for provider in _get_resource_providers(agent): + type = provider.type + if type not in resource_providers: + resource_providers[type] = {} + name = provider.name + assert name not in resource_providers[type], ( + f"Duplicate resource name: {name}" + ) + resource_providers[type][name] = provider + return AgentPlan( + actions=actions, + actions_by_event=actions_by_event, + resource_providers=resource_providers, + ) def get_actions(self, event_type: str) -> List[Action]: """Get actions that listen to the specified event type. @@ -66,6 +155,23 @@ class AgentPlan(BaseModel): """ return [self.actions[name] for name in self.actions_by_event[event_type]] + def get_resource(self, name: str, type: ResourceType) -> Resource: + """Get resource from agent plan. + + Parameters + ---------- + name : str + The name of the resource. + type : ResourceType + The type of the resource. + """ + if type not in self.__resources: + self.__resources[type] = {} + if name not in self.__resources[type]: + resource_provider = self.resource_providers[type][name] + self.__resources[type][name] = resource_provider.provide() + return self.__resources[type][name] + def _get_actions(agent: Agent) -> List[Action]: """Extract all registered agent actions from an agent. @@ -82,12 +188,64 @@ def _get_actions(agent: Agent) -> List[Action]: """ actions = [] for name, value in agent.__class__.__dict__.items(): - if isinstance(value, staticmethod) and hasattr(value, '_listen_events'): - actions.append(Action(name=name, exec=PythonFunction.from_callable(value.__func__), - listen_event_types=[f'{event_type.__module__}.{event_type.__name__}' - for event_type in value._listen_events])) - elif callable(value) and hasattr(value, '_listen_events'): - actions.append(Action(name=name, exec=PythonFunction.from_callable(value), - listen_event_types=[f'{event_type.__module__}.{event_type.__name__}' - for event_type in value._listen_events])) + if isinstance(value, staticmethod) and hasattr(value, "_listen_events"): + actions.append( + Action( + name=name, + exec=PythonFunction.from_callable(value.__func__), + listen_event_types=[ + f"{event_type.__module__}.{event_type.__name__}" + for event_type in value._listen_events + ], + ) + ) + elif callable(value) and hasattr(value, "_listen_events"): + actions.append( + Action( + name=name, + exec=PythonFunction.from_callable(value), + listen_event_types=[ + f"{event_type.__module__}.{event_type.__name__}" + for event_type in value._listen_events + ], + ) + ) return actions + + +def _get_resource_providers(agent: Agent) -> List[ResourceProvider]: + resource_providers = [] + for name, value in agent.__class__.__dict__.items(): + if hasattr(value, "_is_chat_model"): + if isinstance(value, staticmethod): + value = value.__func__ + + if callable(value): + clazz, kwargs = value() + module = clazz.__module__ + provider = PythonResourceProvider( + name=name, + type=clazz.resource_type(), + module=module, + clazz=clazz.__name__, + kwargs=kwargs, + ) + resource_providers.append(provider) + if hasattr(value, "_is_tool"): + if isinstance(value, staticmethod): + value = value.__func__ + + if callable(value): + # TODO: support other tool type. + func = PythonFunction.from_callable(value) + tool = FunctionTool(name=name, func=func) + provider = PythonSerializableResourceProvider( + name=tool.name, + type=tool.resource_type(), + serialized=tool.model_dump(), + module=tool.__module__, + clazz=tool.__class__.__name__, + resource=tool, + ) + resource_providers.append(provider) + return resource_providers diff --git a/python/flink_agents/plan/resource_provider.py b/python/flink_agents/plan/resource_provider.py new file mode 100644 index 0000000..f26e04a --- /dev/null +++ b/python/flink_agents/plan/resource_provider.py @@ -0,0 +1,143 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import importlib +from abc import ABC, abstractmethod +from typing import Any, Dict, Optional + +from pydantic import BaseModel + +from flink_agents.api.resource import ( + Resource, + ResourceType, + SerializableResource, +) + + +class ResourceProvider(BaseModel, ABC): + """Resource provider that carries resource meta to crate + Resource object in runtime. + + Attributes: + ---------- + name : str + The name of the resource + type : ResourceType + The type of the resource + """ + + name: str + type: ResourceType + + @abstractmethod + def provide(self) -> Resource: + """Create resource in runtime.""" + + +class SerializableResourceProvider(ResourceProvider, ABC): + """Resource Provider that carries Resource object or serialized object. + + Attributes: + ---------- + module : str + The module name of the resource. + clazz : str + The class name of the resource. + """ + + module: str + clazz: str + + +class PythonResourceProvider(ResourceProvider): + """Python Resource provider that carries resource meta to crate + Resource object in runtime. + + Attributes: + ---------- + module : str + The module name of the resource. + clazz : str + The class name of the resource. + kwargs : Dict[str, Any] + The initialization arguments of the resource. + """ + + module: str + clazz: str + kwargs: Dict[str, Any] + + def provide(self) -> Resource: + """Create resource in runtime.""" + module = importlib.import_module(self.module) + cls = getattr(module, self.clazz) + return cls(**self.kwargs) + + +class PythonSerializableResourceProvider(SerializableResourceProvider): + """Resource Provider that carries Resource object or serialized object. + + Attributes: + ---------- + serialized : Dict[str, Any] + serialized resource object + resource : Optional[SerializableResource] + SerializableResource object + """ + + serialized: Dict[str, Any] + resource: Optional[SerializableResource] = None + + def provide(self) -> Resource: + """Get or deserialize resource in runtime.""" + if self.resource is None: + module = importlib.import_module(self.module) + clazz = getattr(module, self.clazz) + self.resource = clazz.model_validate(**self.serialized) + return self.resource + + +# TODO: implementation +class JavaResourceProvider(ResourceProvider): + """Represent Resource Provider declared by Java. + + Currently, this class only used for deserializing Java agent plan json + """ + + def provide(self) -> Resource: + """Create resource in runtime.""" + err_msg = ( + "Currently, flink-agents doesn't support create resource " + "by JavaResourceProvider in python." + ) + raise NotImplementedError(err_msg) + + +# TODO: implementation +class JavaSerializableResourceProvider(SerializableResourceProvider): + """Represent Serializable Resource Provider declared by Java. + + Currently, this class only used for deserializing Java agent plan json + """ + + def provide(self) -> Resource: + """Get or deserialize resource in runtime.""" + err_msg = ( + "Currently, flink-agents doesn't support create resource " + "by JavaSerializableResourceProvider in python." + ) + raise NotImplementedError(err_msg) diff --git a/python/flink_agents/plan/tests/resources/agent_plan.json b/python/flink_agents/plan/tests/resources/agent_plan.json index 7930405..abbc3a4 100644 --- a/python/flink_agents/plan/tests/resources/agent_plan.json +++ b/python/flink_agents/plan/tests/resources/agent_plan.json @@ -32,5 +32,21 @@ "flink_agents.plan.tests.test_agent_plan.MyEvent": [ "second_action" ] + }, + "resource_providers": { + "chat_model": { + "mock": { + "name": "mock", + "type": "chat_model", + "module": "flink_agents.plan.tests.test_agent_plan", + "clazz": "MockChatModelImpl", + "kwargs": { + "name": "mock", + "host": "8.8.8.8", + "desc": "mock resource just for testing." + }, + "__resource_provider_type__": "PythonResourceProvider" + } + } } } \ No newline at end of file diff --git a/python/flink_agents/plan/tests/resources/resource_provider.json b/python/flink_agents/plan/tests/resources/resource_provider.json new file mode 100644 index 0000000..c5ce41e --- /dev/null +++ b/python/flink_agents/plan/tests/resources/resource_provider.json @@ -0,0 +1,10 @@ +{ + "name": "mock", + "type": "chat_model", + "module": "flink_agents.plan.tests.test_resource_provider", + "clazz": "MockChatModelImpl", + "kwargs": { + "host": "8.8.8.8", + "desc": "mock chat model" + } +} \ No newline at end of file diff --git a/python/flink_agents/plan/tests/test_agent_plan.py b/python/flink_agents/plan/tests/test_agent_plan.py index 6912044..a536e31 100644 --- a/python/flink_agents/plan/tests/test_agent_plan.py +++ b/python/flink_agents/plan/tests/test_agent_plan.py @@ -17,29 +17,32 @@ ################################################################################# import json from pathlib import Path +from typing import Any, Dict, Tuple, Type import pytest from flink_agents.api.agent import Agent -from flink_agents.api.decorators import action +from flink_agents.api.decorators import action, chat_model from flink_agents.api.event import Event, InputEvent, OutputEvent +from flink_agents.api.resource import Resource, ResourceType from flink_agents.api.runner_context import RunnerContext from flink_agents.plan.agent_plan import AgentPlan from flink_agents.plan.function import PythonFunction -class TestAgent(Agent): #noqa D101 +class TestAgent(Agent): # noqa D101 @action(InputEvent) @staticmethod - def increment(event: Event, ctx: RunnerContext) -> None: #noqa D102 + def increment(event: Event, ctx: RunnerContext) -> None: # noqa D102 value = event.input value += 1 ctx.send_event(OutputEvent(output=value)) -def test_from_agent(): #noqa D102 + +def test_from_agent(): # noqa D102 agent = TestAgent() agent_plan = AgentPlan.from_agent(agent) - event_type = f'{InputEvent.__module__}.{InputEvent.__name__}' + event_type = f"{InputEvent.__module__}.{InputEvent.__name__}" actions = agent_plan.get_actions(event_type) assert len(actions) == 1 action = actions[0] @@ -50,47 +53,81 @@ def test_from_agent(): #noqa D102 assert func.qualname == "TestAgent.increment" assert action.listen_event_types == [event_type] -class InvalidAgent(Agent): #noqa D101 + +class InvalidAgent(Agent): # noqa D101 @action(InputEvent) @staticmethod - def invalid_signature_action(event: Event) -> None: #noqa D102 + def invalid_signature_action(event: Event) -> None: # noqa D102 pass -def test_to_agent_invalid_signature() -> None: #noqa D103 + +def test_to_agent_invalid_signature() -> None: # noqa D103 agent = InvalidAgent() with pytest.raises(TypeError): AgentPlan.from_agent(agent) + class MyEvent(Event): """Event for testing purposes.""" -class MyAgent(Agent): # noqa: D101 + +class MockChatModelImpl(Resource): # noqa: D101 + host: str + desc: str + + @classmethod + def resource_type(cls) -> ResourceType: # noqa: D102 + return ResourceType.CHAT_MODEL + + def chat(self) -> str: + """For testing purposes.""" + return self.host + " " + self.desc + +class MyAgent(Agent): # noqa: D101 + @chat_model + @staticmethod + def mock() -> Tuple[Type[Resource], Dict[str, Any]]: # noqa: D102 + return MockChatModelImpl, { + "name": "mock", + "host": "8.8.8.8", + "desc": "mock resource just for testing.", + } + @action(InputEvent) @staticmethod - def first_action(event: InputEvent, ctx: RunnerContext) -> None: # noqa: D102 + def first_action(event: InputEvent, ctx: RunnerContext) -> None: # noqa: D102 pass @action(InputEvent, MyEvent) @staticmethod - def second_action(event: InputEvent, ctx: RunnerContext) -> None: # noqa: D102 + def second_action(event: InputEvent, ctx: RunnerContext) -> None: # noqa: D102 pass + @pytest.fixture(scope="module") -def agent_plan() -> AgentPlan: # noqa: D103 +def agent_plan() -> AgentPlan: # noqa: D103 return AgentPlan.from_agent(MyAgent()) + current_dir = Path(__file__).parent -def test_agent_plan_serialize(agent_plan: AgentPlan) -> None: # noqa: D103 + +def test_agent_plan_serialize(agent_plan: AgentPlan) -> None: # noqa: D103 json_value = agent_plan.model_dump_json(serialize_as_any=True, indent=4) - with Path.open(Path(f'{current_dir}/resources/agent_plan.json')) as f: + with Path.open(Path(f"{current_dir}/resources/agent_plan.json")) as f: expected_json = f.read() actual = json.loads(json_value) expected = json.loads(expected_json) assert actual == expected -def test_agent_plan_deserialize(agent_plan: AgentPlan) -> None: # noqa: D103 - with Path.open(Path(f'{current_dir}/resources/agent_plan.json')) as f: + +def test_agent_plan_deserialize(agent_plan: AgentPlan) -> None: # noqa: D103 + with Path.open(Path(f"{current_dir}/resources/agent_plan.json")) as f: expected_json = f.read() deserialized_agent_plan = AgentPlan.model_validate_json(expected_json) assert deserialized_agent_plan == agent_plan + +def test_get_resource() -> None: # noqa: D103 + agent_plan = AgentPlan.from_agent(MyAgent()) + mock = agent_plan.get_resource("mock", ResourceType.CHAT_MODEL) + assert mock.chat() == "8.8.8.8 mock resource just for testing." diff --git a/python/flink_agents/plan/tests/test_resource_provider.py b/python/flink_agents/plan/tests/test_resource_provider.py new file mode 100644 index 0000000..616cca2 --- /dev/null +++ b/python/flink_agents/plan/tests/test_resource_provider.py @@ -0,0 +1,68 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import json +from pathlib import Path + +import pytest + +from flink_agents.api.resource import Resource, ResourceType +from flink_agents.plan.resource_provider import PythonResourceProvider, ResourceProvider + +current_dir = Path(__file__).parent + + +class MockChatModelImpl(Resource): # noqa: D101 + host: str + desc: str + + @classmethod + def resource_type(cls) -> ResourceType: # noqa: D102 + return ResourceType.CHAT_MODEL + + +@pytest.fixture(scope="module") +def resource_provider() -> ResourceProvider: # noqa: D103 + return PythonResourceProvider( + name="mock", + type=MockChatModelImpl.resource_type(), + module=MockChatModelImpl.__module__, + clazz=MockChatModelImpl.__name__, + kwargs={"host": "8.8.8.8", "desc": "mock chat model"}, + ) + + +def test_python_resource_provider_serialize( # noqa: D103 + resource_provider: ResourceProvider, +) -> None: + json_value = resource_provider.model_dump_json(serialize_as_any=True) + with Path.open(Path(f"{current_dir}/resources/resource_provider.json")) as f: + expected_json = f.read() + actual = json.loads(json_value) + expected = json.loads(expected_json) + assert actual == expected + + +def test_python_resource_provider_deserialize( # noqa: D103 + resource_provider: ResourceProvider, +) -> None: + with Path.open(Path(f"{current_dir}/resources/resource_provider.json")) as f: + expected_json = f.read() + expected_resource_provider = PythonResourceProvider.model_validate_json( + expected_json + ) + assert resource_provider == expected_resource_provider diff --git a/python/flink_agents/api/runner_context.py b/python/flink_agents/plan/tools/__init__.py similarity index 66% copy from python/flink_agents/api/runner_context.py copy to python/flink_agents/plan/tools/__init__.py index 05b053d..e154fad 100644 --- a/python/flink_agents/api/runner_context.py +++ b/python/flink_agents/plan/tools/__init__.py @@ -15,23 +15,3 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################# -from abc import ABC, abstractmethod - -from flink_agents.api.event import Event - - -class RunnerContext(ABC): - """Abstract base class providing context for agent execution. - - This context provides access to event handling. - """ - - @abstractmethod - def send_event(self, event: Event) -> None: - """Send an event to the agent for processing. - - Parameters - ---------- - event : Event - The event to be processed by the agent system. - """ diff --git a/python/flink_agents/api/runner_context.py b/python/flink_agents/plan/tools/function_tool.py similarity index 67% copy from python/flink_agents/api/runner_context.py copy to python/flink_agents/plan/tools/function_tool.py index 05b053d..2847b90 100644 --- a/python/flink_agents/api/runner_context.py +++ b/python/flink_agents/plan/tools/function_tool.py @@ -15,23 +15,19 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################# -from abc import ABC, abstractmethod +from typing import Any, Dict, Tuple -from flink_agents.api.event import Event +from flink_agents.api.tools.tool import BaseTool +from flink_agents.plan.function import Function -class RunnerContext(ABC): - """Abstract base class providing context for agent execution. +#TODO: Complete FunctionTool +class FunctionTool(BaseTool): + """Function tool. - This context provides access to event handling. + Currently, this class is just for testing purposes. """ - - @abstractmethod - def send_event(self, event: Event) -> None: - """Send an event to the agent for processing. - - Parameters - ---------- - event : Event - The event to be processed by the agent system. - """ + func: Function + def call(self, *args: Tuple[Any, ...], **kwargs: Dict[str, Any]) -> Any: + """Call function.""" + return self.func(*args, **kwargs) diff --git a/python/flink_agents/runtime/flink_runner_context.py b/python/flink_agents/runtime/flink_runner_context.py index 980f70d..001735d 100644 --- a/python/flink_agents/runtime/flink_runner_context.py +++ b/python/flink_agents/runtime/flink_runner_context.py @@ -21,7 +21,9 @@ import cloudpickle from typing_extensions import override from flink_agents.api.event import Event +from flink_agents.api.resource import Resource, ResourceType from flink_agents.api.runner_context import RunnerContext +from flink_agents.plan.agent_plan import AgentPlan class FlinkRunnerContext(RunnerContext): @@ -30,7 +32,9 @@ class FlinkRunnerContext(RunnerContext): This context allows access to event handling. """ - def __init__(self, j_runner_context: Any) -> None: + __agent_plan: AgentPlan + + def __init__(self, j_runner_context: Any, agent_plan_json: str) -> None: """Initialize a flink runner context with the given java runner context. Parameters @@ -39,6 +43,7 @@ class FlinkRunnerContext(RunnerContext): Java runner context used to synchronize data between Python and Java. """ self._j_runner_context = j_runner_context + self.__agent_plan = AgentPlan.model_validate_json(agent_plan_json) @override def send_event(self, event: Event) -> None: @@ -49,13 +54,17 @@ class FlinkRunnerContext(RunnerContext): event : Event The event to be processed by the agent system. """ + class_path = f"{event.__class__.__module__}.{event.__class__.__qualname__}" try: - class_path = f"{event.__class__.__module__}.{event.__class__.__qualname__}" self._j_runner_context.sendEvent(class_path, cloudpickle.dumps(event)) except Exception as e: err_msg = "Failed to send event " + class_path + " to runner context" raise RuntimeError(err_msg) from e -def create_flink_runner_context(j_runner_context: Any) -> FlinkRunnerContext: + @override + def get_resource(self, name: str, type: ResourceType) -> Resource: + return self.__agent_plan.get_resource(name, type) + +def create_flink_runner_context(j_runner_context: Any, agent_plan_json: str) -> FlinkRunnerContext: """Used to create a FlinkRunnerContext Python object in Pemja environment.""" - return FlinkRunnerContext(j_runner_context) + return FlinkRunnerContext(j_runner_context, agent_plan_json) diff --git a/python/flink_agents/runtime/local_runner.py b/python/flink_agents/runtime/local_runner.py index dfbb582..19bf795 100644 --- a/python/flink_agents/runtime/local_runner.py +++ b/python/flink_agents/runtime/local_runner.py @@ -24,6 +24,7 @@ from typing_extensions import override from flink_agents.api.agent import Agent from flink_agents.api.event import Event, InputEvent, OutputEvent +from flink_agents.api.resource import Resource, ResourceType from flink_agents.api.runner_context import RunnerContext from flink_agents.plan.agent_plan import AgentPlan from flink_agents.runtime.agent_runner import AgentRunner @@ -91,6 +92,10 @@ class LocalRunnerContext(RunnerContext): logger.info("key: %s, send_event: %s", self.__key, event) self.events.append(event) + @override + def get_resource(self, name: str, type: ResourceType) -> Resource: + return self.__agent_plan.get_resource(name, type) + class LocalRunner(AgentRunner): """Agent runner implementation for local execution, which is diff --git a/python/flink_agents/runtime/tests/test_get_resource_in_action.py b/python/flink_agents/runtime/tests/test_get_resource_in_action.py new file mode 100644 index 0000000..89d5cb7 --- /dev/null +++ b/python/flink_agents/runtime/tests/test_get_resource_in_action.py @@ -0,0 +1,93 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +from typing import Any, Dict, Tuple, Type + +from flink_agents.api.agent import Agent +from flink_agents.api.chat_models.chat_model import BaseChatModel +from flink_agents.api.decorators import action, chat_model, tool +from flink_agents.api.event import InputEvent, OutputEvent +from flink_agents.api.execution_environment import AgentsExecutionEnvironment +from flink_agents.api.resource import Resource, ResourceType +from flink_agents.api.runner_context import RunnerContext + + +class MockChatModelImpl(BaseChatModel): # noqa: D101 + host: str + desc: str + + @classmethod + def resource_type(cls) -> ResourceType: # noqa: D102 + return ResourceType.CHAT_MODEL + + def chat(self) -> str: + """For testing purposes.""" + return self.host + " " + self.desc + + +class MyAgent(Agent): # noqa: D101 + @chat_model + @staticmethod + def mock_chat_model() -> Tuple[Type[Resource], Dict[str, Any]]: # noqa: D102 + return MockChatModelImpl, { + "name": "mock_chat_model", + "host": "8.8.8.8", + "desc": "mock chat model just for testing.", + } + + @tool + @staticmethod + def mock_tool(input: str) -> str: # noqa: D102 + return input + " mock tools just for testing." + + @action(InputEvent) + @staticmethod + def mock_action(event: InputEvent, ctx: RunnerContext) -> None: # noqa: D102 + input = event.input + mock_chat_model = ctx.get_resource( + type=ResourceType.CHAT_MODEL, name="mock_chat_model" + ) + mock_tool = ctx.get_resource(type=ResourceType.TOOL, name="mock_tool") + ctx.send_event( + OutputEvent( + output=input + + " " + + mock_chat_model.chat() + + " " + + mock_tool.call("call") + ) + ) + + +def test_get_resource_in_action() -> None: # noqa: D103 + env = AgentsExecutionEnvironment.get_execution_environment() + + input_list = [] + agent = MyAgent() + + output_list = env.from_list(input_list).apply(agent).to_list() + + input_list.append({"key": "bob", "value": "the first message."}) + + env.execute() + + assert output_list == [ + { + "bob": "the first message. 8.8.8.8 mock chat model " + "just for testing. call mock tools just for testing." + } + ] diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java index c749624..b85cf0b 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java @@ -30,6 +30,7 @@ import org.apache.flink.agents.runtime.python.event.PythonEvent; import org.apache.flink.agents.runtime.python.utils.PythonActionExecutor; import org.apache.flink.agents.runtime.utils.EventUtil; import org.apache.flink.python.env.PythonDependencyInfo; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -172,7 +173,10 @@ public class ActionExecutionOperator<IN, OUT> extends AbstractStreamOperator<OUT .getTmpDirectories(), new HashMap<>(System.getenv()), getRuntimeContext().getJobInfo().getJobId()); - pythonActionExecutor = new PythonActionExecutor(pythonEnvironmentManager); + pythonActionExecutor = + new PythonActionExecutor( + pythonEnvironmentManager, + new ObjectMapper().writeValueAsString(agentPlan)); pythonActionExecutor.open(); } } diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java index 8a37ea4..b67e637 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java @@ -47,12 +47,14 @@ public class PythonActionExecutor { private static final String FLINK_RUNNER_CONTEXT_VAR_NAME = "flink_runner_context"; private final PythonEnvironmentManager environmentManager; + private final String agentPlanJson; private final PythonRunnerContextImpl runnerContext; private PythonInterpreter interpreter; - public PythonActionExecutor(PythonEnvironmentManager environmentManager) { + public PythonActionExecutor(PythonEnvironmentManager environmentManager, String agentPlanJson) { this.environmentManager = environmentManager; + this.agentPlanJson = agentPlanJson; this.runnerContext = new PythonRunnerContextImpl(); } @@ -65,7 +67,7 @@ public class PythonActionExecutor { // TODO: remove the set and get runner context after updating pemja to version 0.5.3 Object pythonRunnerContextObject = - interpreter.invoke(CREATE_FLINK_RUNNER_CONTEXT, runnerContext); + interpreter.invoke(CREATE_FLINK_RUNNER_CONTEXT, runnerContext, agentPlanJson); interpreter.set(FLINK_RUNNER_CONTEXT_VAR_NAME, pythonRunnerContextObject); }