This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new b55ca1c  [plan][runtime][python] Introduce Resource and Resource 
Provider in python (#70)
b55ca1c is described below

commit b55ca1c00e8a056255a573bf6251a701592739e5
Author: Wenjin Xie <166717626+wenjin...@users.noreply.github.com>
AuthorDate: Fri Jul 18 17:50:47 2025 +0800

    [plan][runtime][python] Introduce Resource and Resource Provider in python 
(#70)
---
 .../{runner_context.py => chat_models/__init__.py} |  20 ---
 .../chat_model.py}                                 |  26 ++-
 python/flink_agents/api/decorators.py              |  40 ++++-
 python/flink_agents/api/resource.py                |  68 ++++++++
 python/flink_agents/api/runner_context.py          |  13 ++
 .../api/{runner_context.py => tools/__init__.py}   |  20 ---
 .../api/{runner_context.py => tools/tool.py}       |  26 ++-
 python/flink_agents/plan/agent_plan.py             | 180 +++++++++++++++++++--
 python/flink_agents/plan/resource_provider.py      | 143 ++++++++++++++++
 .../plan/tests/resources/agent_plan.json           |  16 ++
 .../plan/tests/resources/resource_provider.json    |  10 ++
 python/flink_agents/plan/tests/test_agent_plan.py  |  69 ++++++--
 .../plan/tests/test_resource_provider.py           |  68 ++++++++
 .../runner_context.py => plan/tools/__init__.py}   |  20 ---
 .../tools/function_tool.py}                        |  26 ++-
 .../flink_agents/runtime/flink_runner_context.py   |  17 +-
 python/flink_agents/runtime/local_runner.py        |   5 +
 .../runtime/tests/test_get_resource_in_action.py   |  93 +++++++++++
 .../runtime/operator/ActionExecutionOperator.java  |   6 +-
 .../runtime/python/utils/PythonActionExecutor.java |   6 +-
 20 files changed, 733 insertions(+), 139 deletions(-)

diff --git a/python/flink_agents/api/runner_context.py 
b/python/flink_agents/api/chat_models/__init__.py
similarity index 66%
copy from python/flink_agents/api/runner_context.py
copy to python/flink_agents/api/chat_models/__init__.py
index 05b053d..e154fad 100644
--- a/python/flink_agents/api/runner_context.py
+++ b/python/flink_agents/api/chat_models/__init__.py
@@ -15,23 +15,3 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
#################################################################################
-from abc import ABC, abstractmethod
-
-from flink_agents.api.event import Event
-
-
-class RunnerContext(ABC):
-    """Abstract base class providing context for agent execution.
-
-    This context provides access to event handling.
-    """
-
-    @abstractmethod
-    def send_event(self, event: Event) -> None:
-        """Send an event to the agent for processing.
-
-        Parameters
-        ----------
-        event : Event
-            The event to be processed by the agent system.
-        """
diff --git a/python/flink_agents/api/runner_context.py 
b/python/flink_agents/api/chat_models/chat_model.py
similarity index 67%
copy from python/flink_agents/api/runner_context.py
copy to python/flink_agents/api/chat_models/chat_model.py
index 05b053d..b2fb985 100644
--- a/python/flink_agents/api/runner_context.py
+++ b/python/flink_agents/api/chat_models/chat_model.py
@@ -15,23 +15,21 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
#################################################################################
-from abc import ABC, abstractmethod
+from abc import ABC
 
-from flink_agents.api.event import Event
+from typing_extensions import override
 
+from flink_agents.api.resource import Resource, ResourceType
 
-class RunnerContext(ABC):
-    """Abstract base class providing context for agent execution.
 
-    This context provides access to event handling.
-    """
+#TODO: Complete BaseChatModel
+class BaseChatModel(Resource, ABC):
+    """Base abstract class of all kinds of chat models.
 
-    @abstractmethod
-    def send_event(self, event: Event) -> None:
-        """Send an event to the agent for processing.
+    Currently, this class is empty just for testing purposes.
+    """
 
-        Parameters
-        ----------
-        event : Event
-            The event to be processed by the agent system.
-        """
+    @classmethod
+    @override
+    def resource_type(cls) -> ResourceType:
+        return ResourceType.CHAT_MODEL
diff --git a/python/flink_agents/api/decorators.py 
b/python/flink_agents/api/decorators.py
index 6a0b8b8..1d8e3bb 100644
--- a/python/flink_agents/api/decorators.py
+++ b/python/flink_agents/api/decorators.py
@@ -38,13 +38,49 @@ def action(*listen_events: Tuple[Type[Event], ...]) -> 
Callable:
     AssertionError
         If no events are provided to listen to.
     """
-    assert len(listen_events) > 0, 'action must have at least one event type 
to listen to'
+    assert len(listen_events) > 0, (
+        "action must have at least one event type to listen to"
+    )
 
     for event in listen_events:
-        assert issubclass(event, Event), 'action must only listen to event 
types.'
+        assert issubclass(event, Event), "action must only listen to event 
types."
 
     def decorator(func: Callable) -> Callable:
         func._listen_events = listen_events
         return func
 
     return decorator
+
+
+def chat_model(func: Callable) -> Callable:
+    """Decorator for marking a function declaring a chat model.
+
+    Parameters
+    ----------
+    func : Callable
+        Function to be decorated.
+
+    Returns:
+    -------
+    Callable
+        Decorator function that marks the target function declare a chat model.
+    """
+    func._is_chat_model = True
+    return func
+
+
+def tool(func: Callable) -> Callable:
+    """Decorator for marking a function declaring a chat model.
+
+    Parameters
+    ----------
+    func : Callable
+        Function to be decorated.
+
+    Returns:
+    -------
+    Callable
+        Decorator function that marks the target function declare a tools.
+    """
+    func._is_tool = True
+    return func
diff --git a/python/flink_agents/api/resource.py 
b/python/flink_agents/api/resource.py
new file mode 100644
index 0000000..30b440d
--- /dev/null
+++ b/python/flink_agents/api/resource.py
@@ -0,0 +1,68 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from abc import ABC, abstractmethod
+from enum import Enum
+
+from pydantic import BaseModel, model_validator
+
+
+class ResourceType(Enum):
+    """Type enum of resource.
+
+    Currently, only support chat_models and tools.
+    """
+
+    CHAT_MODEL = "chat_model"
+    TOOL = "tool"
+    # EMBEDDING_MODEL = "embedding_model"
+    # PROMPT = "prompt"
+    # VECTOR_STORE = "vector_store"
+    # MCP_SERVER = "mcp_server"
+
+
+class Resource(BaseModel, ABC):
+    """Base abstract class of all kinds of resources, includes chat model,
+    prompt, tools and so on.
+
+    Resource extends BaseModel only for decreasing the complexity of attribute
+    declaration of subclasses, this not represents Resource object is 
serializable.
+
+    Attributes:
+    ----------
+    name : str
+        The name of the resource.
+    type : ResourceType
+        The type of the resource.
+    """
+
+    name: str
+
+    @classmethod
+    @abstractmethod
+    def resource_type(cls) -> ResourceType:
+        """Return resource type of class."""
+
+
+class SerializableResource(Resource, ABC):
+    """Resource which is serializable."""
+
+    @model_validator(mode="after")
+    def validate_serializable(self) -> "SerializableResource":
+        """Ensure resource is serializable."""
+        self.model_dump_json()
+        return self
diff --git a/python/flink_agents/api/runner_context.py 
b/python/flink_agents/api/runner_context.py
index 05b053d..f36472a 100644
--- a/python/flink_agents/api/runner_context.py
+++ b/python/flink_agents/api/runner_context.py
@@ -18,6 +18,7 @@
 from abc import ABC, abstractmethod
 
 from flink_agents.api.event import Event
+from flink_agents.api.resource import Resource, ResourceType
 
 
 class RunnerContext(ABC):
@@ -35,3 +36,15 @@ class RunnerContext(ABC):
         event : Event
             The event to be processed by the agent system.
         """
+
+    @abstractmethod
+    def get_resource(self, name: str, type: ResourceType) -> Resource:
+        """Get resource from context.
+
+        Parameters
+        ----------
+        name : str
+            The name of the resource.
+        type : ResourceType
+            The type of the resource.
+        """
diff --git a/python/flink_agents/api/runner_context.py 
b/python/flink_agents/api/tools/__init__.py
similarity index 66%
copy from python/flink_agents/api/runner_context.py
copy to python/flink_agents/api/tools/__init__.py
index 05b053d..e154fad 100644
--- a/python/flink_agents/api/runner_context.py
+++ b/python/flink_agents/api/tools/__init__.py
@@ -15,23 +15,3 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
#################################################################################
-from abc import ABC, abstractmethod
-
-from flink_agents.api.event import Event
-
-
-class RunnerContext(ABC):
-    """Abstract base class providing context for agent execution.
-
-    This context provides access to event handling.
-    """
-
-    @abstractmethod
-    def send_event(self, event: Event) -> None:
-        """Send an event to the agent for processing.
-
-        Parameters
-        ----------
-        event : Event
-            The event to be processed by the agent system.
-        """
diff --git a/python/flink_agents/api/runner_context.py 
b/python/flink_agents/api/tools/tool.py
similarity index 67%
copy from python/flink_agents/api/runner_context.py
copy to python/flink_agents/api/tools/tool.py
index 05b053d..566fa10 100644
--- a/python/flink_agents/api/runner_context.py
+++ b/python/flink_agents/api/tools/tool.py
@@ -15,23 +15,21 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
#################################################################################
-from abc import ABC, abstractmethod
+from abc import ABC
 
-from flink_agents.api.event import Event
+from typing_extensions import override
 
+from flink_agents.api.resource import ResourceType, SerializableResource
 
-class RunnerContext(ABC):
-    """Abstract base class providing context for agent execution.
 
-    This context provides access to event handling.
-    """
+#TODO: Complete BaseTool
+class BaseTool(SerializableResource, ABC):
+    """Base abstract class of all kinds of tools.
 
-    @abstractmethod
-    def send_event(self, event: Event) -> None:
-        """Send an event to the agent for processing.
+    Currently, this class is empty just for testing purposes
+    """
 
-        Parameters
-        ----------
-        event : Event
-            The event to be processed by the agent system.
-        """
+    @classmethod
+    @override
+    def resource_type(cls) -> ResourceType:
+        return ResourceType.TOOL
diff --git a/python/flink_agents/plan/agent_plan.py 
b/python/flink_agents/plan/agent_plan.py
index e37d3a8..cb77f09 100644
--- a/python/flink_agents/plan/agent_plan.py
+++ b/python/flink_agents/plan/agent_plan.py
@@ -15,13 +15,22 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
#################################################################################
-from typing import Dict, List
+from typing import Dict, List, Optional
 
-from pydantic import BaseModel
+from pydantic import BaseModel, field_serializer, model_validator
 
 from flink_agents.api.agent import Agent
+from flink_agents.api.resource import Resource, ResourceType
 from flink_agents.plan.action import Action
 from flink_agents.plan.function import PythonFunction
+from flink_agents.plan.resource_provider import (
+    JavaResourceProvider,
+    JavaSerializableResourceProvider,
+    PythonResourceProvider,
+    PythonSerializableResourceProvider,
+    ResourceProvider,
+)
+from flink_agents.plan.tools.function_tool import FunctionTool
 
 
 class AgentPlan(BaseModel):
@@ -33,9 +42,74 @@ class AgentPlan(BaseModel):
         Mapping of action names to actions
     actions_by_event : Dict[Type[Event], str]
         Mapping of event types to the list of actions name that listen to them.
+    resource_providers: ResourceProvider
+        Two level mapping of resource type to resource name to resource 
provider.
     """
+
     actions: Dict[str, Action]
     actions_by_event: Dict[str, List[str]]
+    resource_providers: Optional[Dict[ResourceType, Dict[str, 
ResourceProvider]]] = None
+    __resources: Dict[ResourceType, Dict[str, Resource]] = {}
+
+    @field_serializer("resource_providers")
+    def __serialize_resource_providers(
+        self, providers: Dict[ResourceType, Dict[str, ResourceProvider]]
+    ) -> dict:
+        # append meta info to help deserialize resource providers
+        data = {}
+        for type in providers:
+            data[type] = {}
+            for name, provider in providers[type].items():
+                data[type][name] = provider.model_dump()
+                if isinstance(provider, PythonResourceProvider):
+                    data[type][name]["__resource_provider_type__"] = (
+                        "PythonResourceProvider"
+                    )
+                elif isinstance(provider, PythonSerializableResourceProvider):
+                    data[type][name]["__resource_provider_type__"] = (
+                        "PythonSerializableResourceProvider"
+                    )
+                elif isinstance(provider, JavaResourceProvider):
+                    data[type][name]["__resource_provider_type__"] = (
+                        "JavaResourceProvider"
+                    )
+                elif isinstance(provider, JavaSerializableResourceProvider):
+                    data[type][name]["__resource_provider_type__"] = (
+                        "JavaSerializableResourceProvider"
+                    )
+        return data
+
+    @model_validator(mode="before")
+    def __custom_deserialize(self) -> "AgentPlan":
+        if "resource_providers" in self:
+            providers = self["resource_providers"]
+            # restore exec from serialized json.
+            if isinstance(providers, dict):
+                for type in providers:
+                    for name, provider in providers[type].items():
+                        if isinstance(provider, dict):
+                            provider_type = 
provider["__resource_provider_type__"]
+                            if provider_type == "PythonResourceProvider":
+                                self["resource_providers"][type][name] = (
+                                    
PythonResourceProvider.model_validate(provider)
+                                )
+                            elif provider_type == 
"PythonSerializableResourceProvider":
+                                self["resource_providers"][type][name] = (
+                                    
PythonSerializableResourceProvider.model_validate(
+                                        provider
+                                    )
+                                )
+                            elif provider_type == "JavaResourceProvider":
+                                self["resource_providers"][type][name] = (
+                                    
JavaResourceProvider.model_validate(provider)
+                                )
+                            elif provider_type == 
"JavaSerializableResourceProvider":
+                                self["resource_providers"][type][name] = (
+                                    
JavaSerializableResourceProvider.model_validate(
+                                        provider
+                                    )
+                                )
+        return self
 
     @staticmethod
     def from_agent(agent: Agent) -> "AgentPlan":
@@ -49,7 +123,22 @@ class AgentPlan(BaseModel):
                 if event_type not in actions_by_event:
                     actions_by_event[event_type] = []
                 actions_by_event[event_type].append(action.name)
-        return AgentPlan(actions=actions, actions_by_event=actions_by_event)
+
+        resource_providers = {}
+        for provider in _get_resource_providers(agent):
+            type = provider.type
+            if type not in resource_providers:
+                resource_providers[type] = {}
+            name = provider.name
+            assert name not in resource_providers[type], (
+                f"Duplicate resource name: {name}"
+            )
+            resource_providers[type][name] = provider
+        return AgentPlan(
+            actions=actions,
+            actions_by_event=actions_by_event,
+            resource_providers=resource_providers,
+        )
 
     def get_actions(self, event_type: str) -> List[Action]:
         """Get actions that listen to the specified event type.
@@ -66,6 +155,23 @@ class AgentPlan(BaseModel):
         """
         return [self.actions[name] for name in 
self.actions_by_event[event_type]]
 
+    def get_resource(self, name: str, type: ResourceType) -> Resource:
+        """Get resource from agent plan.
+
+        Parameters
+        ----------
+        name : str
+            The name of the resource.
+        type : ResourceType
+            The type of the resource.
+        """
+        if type not in self.__resources:
+            self.__resources[type] = {}
+        if name not in self.__resources[type]:
+            resource_provider = self.resource_providers[type][name]
+            self.__resources[type][name] = resource_provider.provide()
+        return self.__resources[type][name]
+
 
 def _get_actions(agent: Agent) -> List[Action]:
     """Extract all registered agent actions from an agent.
@@ -82,12 +188,64 @@ def _get_actions(agent: Agent) -> List[Action]:
     """
     actions = []
     for name, value in agent.__class__.__dict__.items():
-        if isinstance(value, staticmethod) and hasattr(value, 
'_listen_events'):
-            actions.append(Action(name=name, 
exec=PythonFunction.from_callable(value.__func__),
-                                  
listen_event_types=[f'{event_type.__module__}.{event_type.__name__}'
-                                                      for event_type in 
value._listen_events]))
-        elif callable(value) and hasattr(value, '_listen_events'):
-            actions.append(Action(name=name, 
exec=PythonFunction.from_callable(value),
-                                  
listen_event_types=[f'{event_type.__module__}.{event_type.__name__}'
-                                                      for event_type in 
value._listen_events]))
+        if isinstance(value, staticmethod) and hasattr(value, 
"_listen_events"):
+            actions.append(
+                Action(
+                    name=name,
+                    exec=PythonFunction.from_callable(value.__func__),
+                    listen_event_types=[
+                        f"{event_type.__module__}.{event_type.__name__}"
+                        for event_type in value._listen_events
+                    ],
+                )
+            )
+        elif callable(value) and hasattr(value, "_listen_events"):
+            actions.append(
+                Action(
+                    name=name,
+                    exec=PythonFunction.from_callable(value),
+                    listen_event_types=[
+                        f"{event_type.__module__}.{event_type.__name__}"
+                        for event_type in value._listen_events
+                    ],
+                )
+            )
     return actions
+
+
+def _get_resource_providers(agent: Agent) -> List[ResourceProvider]:
+    resource_providers = []
+    for name, value in agent.__class__.__dict__.items():
+        if hasattr(value, "_is_chat_model"):
+            if isinstance(value, staticmethod):
+                value = value.__func__
+
+            if callable(value):
+                clazz, kwargs = value()
+                module = clazz.__module__
+                provider = PythonResourceProvider(
+                    name=name,
+                    type=clazz.resource_type(),
+                    module=module,
+                    clazz=clazz.__name__,
+                    kwargs=kwargs,
+                )
+                resource_providers.append(provider)
+        if hasattr(value, "_is_tool"):
+            if isinstance(value, staticmethod):
+                value = value.__func__
+
+            if callable(value):
+                # TODO: support other tool type.
+                func = PythonFunction.from_callable(value)
+                tool = FunctionTool(name=name, func=func)
+                provider = PythonSerializableResourceProvider(
+                    name=tool.name,
+                    type=tool.resource_type(),
+                    serialized=tool.model_dump(),
+                    module=tool.__module__,
+                    clazz=tool.__class__.__name__,
+                    resource=tool,
+                )
+                resource_providers.append(provider)
+    return resource_providers
diff --git a/python/flink_agents/plan/resource_provider.py 
b/python/flink_agents/plan/resource_provider.py
new file mode 100644
index 0000000..f26e04a
--- /dev/null
+++ b/python/flink_agents/plan/resource_provider.py
@@ -0,0 +1,143 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import importlib
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Optional
+
+from pydantic import BaseModel
+
+from flink_agents.api.resource import (
+    Resource,
+    ResourceType,
+    SerializableResource,
+)
+
+
+class ResourceProvider(BaseModel, ABC):
+    """Resource provider that carries resource meta to crate
+     Resource object in runtime.
+
+    Attributes:
+    ----------
+    name : str
+        The name of the resource
+    type : ResourceType
+        The type of the resource
+    """
+
+    name: str
+    type: ResourceType
+
+    @abstractmethod
+    def provide(self) -> Resource:
+        """Create resource in runtime."""
+
+
+class SerializableResourceProvider(ResourceProvider, ABC):
+    """Resource Provider that carries Resource object or serialized object.
+
+    Attributes:
+    ----------
+    module : str
+        The module name of the resource.
+    clazz : str
+        The class name of the resource.
+    """
+
+    module: str
+    clazz: str
+
+
+class PythonResourceProvider(ResourceProvider):
+    """Python Resource provider that carries resource meta to crate
+     Resource object in runtime.
+
+    Attributes:
+    ----------
+    module : str
+        The module name of the resource.
+    clazz : str
+        The class name of the resource.
+    kwargs : Dict[str, Any]
+        The initialization arguments of the resource.
+    """
+
+    module: str
+    clazz: str
+    kwargs: Dict[str, Any]
+
+    def provide(self) -> Resource:
+        """Create resource in runtime."""
+        module = importlib.import_module(self.module)
+        cls = getattr(module, self.clazz)
+        return cls(**self.kwargs)
+
+
+class PythonSerializableResourceProvider(SerializableResourceProvider):
+    """Resource Provider that carries Resource object or serialized object.
+
+    Attributes:
+    ----------
+    serialized : Dict[str, Any]
+        serialized resource object
+    resource : Optional[SerializableResource]
+        SerializableResource object
+    """
+
+    serialized: Dict[str, Any]
+    resource: Optional[SerializableResource] = None
+
+    def provide(self) -> Resource:
+        """Get or deserialize resource in runtime."""
+        if self.resource is None:
+            module = importlib.import_module(self.module)
+            clazz = getattr(module, self.clazz)
+            self.resource = clazz.model_validate(**self.serialized)
+        return self.resource
+
+
+# TODO: implementation
+class JavaResourceProvider(ResourceProvider):
+    """Represent Resource Provider declared by Java.
+
+    Currently, this class only used for deserializing Java agent plan json
+    """
+
+    def provide(self) -> Resource:
+        """Create resource in runtime."""
+        err_msg = (
+            "Currently, flink-agents doesn't support create resource "
+            "by JavaResourceProvider in python."
+        )
+        raise NotImplementedError(err_msg)
+
+
+# TODO: implementation
+class JavaSerializableResourceProvider(SerializableResourceProvider):
+    """Represent Serializable Resource Provider declared by Java.
+
+    Currently, this class only used for deserializing Java agent plan json
+    """
+
+    def provide(self) -> Resource:
+        """Get or deserialize resource in runtime."""
+        err_msg = (
+            "Currently, flink-agents doesn't support create resource "
+            "by JavaSerializableResourceProvider in python."
+        )
+        raise NotImplementedError(err_msg)
diff --git a/python/flink_agents/plan/tests/resources/agent_plan.json 
b/python/flink_agents/plan/tests/resources/agent_plan.json
index 7930405..abbc3a4 100644
--- a/python/flink_agents/plan/tests/resources/agent_plan.json
+++ b/python/flink_agents/plan/tests/resources/agent_plan.json
@@ -32,5 +32,21 @@
         "flink_agents.plan.tests.test_agent_plan.MyEvent": [
             "second_action"
         ]
+    },
+    "resource_providers": {
+        "chat_model": {
+            "mock": {
+                "name": "mock",
+                "type": "chat_model",
+                "module": "flink_agents.plan.tests.test_agent_plan",
+                "clazz": "MockChatModelImpl",
+                "kwargs": {
+                    "name": "mock",
+                    "host": "8.8.8.8",
+                    "desc": "mock resource just for testing."
+                },
+                "__resource_provider_type__": "PythonResourceProvider"
+            }
+        }
     }
 }
\ No newline at end of file
diff --git a/python/flink_agents/plan/tests/resources/resource_provider.json 
b/python/flink_agents/plan/tests/resources/resource_provider.json
new file mode 100644
index 0000000..c5ce41e
--- /dev/null
+++ b/python/flink_agents/plan/tests/resources/resource_provider.json
@@ -0,0 +1,10 @@
+{
+    "name": "mock",
+    "type": "chat_model",
+    "module": "flink_agents.plan.tests.test_resource_provider",
+    "clazz": "MockChatModelImpl",
+    "kwargs": {
+        "host": "8.8.8.8",
+        "desc": "mock chat model"
+    }
+}
\ No newline at end of file
diff --git a/python/flink_agents/plan/tests/test_agent_plan.py 
b/python/flink_agents/plan/tests/test_agent_plan.py
index 6912044..a536e31 100644
--- a/python/flink_agents/plan/tests/test_agent_plan.py
+++ b/python/flink_agents/plan/tests/test_agent_plan.py
@@ -17,29 +17,32 @@
 
#################################################################################
 import json
 from pathlib import Path
+from typing import Any, Dict, Tuple, Type
 
 import pytest
 
 from flink_agents.api.agent import Agent
-from flink_agents.api.decorators import action
+from flink_agents.api.decorators import action, chat_model
 from flink_agents.api.event import Event, InputEvent, OutputEvent
+from flink_agents.api.resource import Resource, ResourceType
 from flink_agents.api.runner_context import RunnerContext
 from flink_agents.plan.agent_plan import AgentPlan
 from flink_agents.plan.function import PythonFunction
 
 
-class TestAgent(Agent): #noqa D101
+class TestAgent(Agent):  # noqa D101
     @action(InputEvent)
     @staticmethod
-    def increment(event: Event, ctx: RunnerContext) -> None: #noqa D102
+    def increment(event: Event, ctx: RunnerContext) -> None:  # noqa D102
         value = event.input
         value += 1
         ctx.send_event(OutputEvent(output=value))
 
-def test_from_agent(): #noqa D102
+
+def test_from_agent():  # noqa D102
     agent = TestAgent()
     agent_plan = AgentPlan.from_agent(agent)
-    event_type = f'{InputEvent.__module__}.{InputEvent.__name__}'
+    event_type = f"{InputEvent.__module__}.{InputEvent.__name__}"
     actions = agent_plan.get_actions(event_type)
     assert len(actions) == 1
     action = actions[0]
@@ -50,47 +53,81 @@ def test_from_agent(): #noqa D102
     assert func.qualname == "TestAgent.increment"
     assert action.listen_event_types == [event_type]
 
-class InvalidAgent(Agent): #noqa D101
+
+class InvalidAgent(Agent):  # noqa D101
     @action(InputEvent)
     @staticmethod
-    def invalid_signature_action(event: Event) -> None: #noqa D102
+    def invalid_signature_action(event: Event) -> None:  # noqa D102
         pass
 
-def test_to_agent_invalid_signature() -> None: #noqa D103
+
+def test_to_agent_invalid_signature() -> None:  # noqa D103
     agent = InvalidAgent()
     with pytest.raises(TypeError):
         AgentPlan.from_agent(agent)
 
+
 class MyEvent(Event):
     """Event for testing purposes."""
 
-class MyAgent(Agent): # noqa: D101
+
+class MockChatModelImpl(Resource):  # noqa: D101
+    host: str
+    desc: str
+
+    @classmethod
+    def resource_type(cls) -> ResourceType: # noqa: D102
+        return ResourceType.CHAT_MODEL
+
+    def chat(self) -> str:
+        """For testing purposes."""
+        return self.host + " " + self.desc
+
+class MyAgent(Agent):  # noqa: D101
+    @chat_model
+    @staticmethod
+    def mock() -> Tuple[Type[Resource], Dict[str, Any]]: # noqa: D102
+        return MockChatModelImpl, {
+            "name": "mock",
+            "host": "8.8.8.8",
+            "desc": "mock resource just for testing.",
+        }
+
     @action(InputEvent)
     @staticmethod
-    def first_action(event: InputEvent, ctx: RunnerContext) -> None: # noqa: 
D102
+    def first_action(event: InputEvent, ctx: RunnerContext) -> None:  # noqa: 
D102
         pass
 
     @action(InputEvent, MyEvent)
     @staticmethod
-    def second_action(event: InputEvent, ctx: RunnerContext) -> None: # noqa: 
D102
+    def second_action(event: InputEvent, ctx: RunnerContext) -> None:  # noqa: 
D102
         pass
 
+
 @pytest.fixture(scope="module")
-def agent_plan() -> AgentPlan: # noqa: D103
+def agent_plan() -> AgentPlan:  # noqa: D103
     return AgentPlan.from_agent(MyAgent())
 
+
 current_dir = Path(__file__).parent
 
-def test_agent_plan_serialize(agent_plan: AgentPlan) -> None: # noqa: D103
+
+def test_agent_plan_serialize(agent_plan: AgentPlan) -> None:  # noqa: D103
     json_value = agent_plan.model_dump_json(serialize_as_any=True, indent=4)
-    with Path.open(Path(f'{current_dir}/resources/agent_plan.json')) as f:
+    with Path.open(Path(f"{current_dir}/resources/agent_plan.json")) as f:
         expected_json = f.read()
     actual = json.loads(json_value)
     expected = json.loads(expected_json)
     assert actual == expected
 
-def test_agent_plan_deserialize(agent_plan: AgentPlan) -> None: # noqa: D103
-    with Path.open(Path(f'{current_dir}/resources/agent_plan.json')) as f:
+
+def test_agent_plan_deserialize(agent_plan: AgentPlan) -> None:  # noqa: D103
+    with Path.open(Path(f"{current_dir}/resources/agent_plan.json")) as f:
         expected_json = f.read()
     deserialized_agent_plan = AgentPlan.model_validate_json(expected_json)
     assert deserialized_agent_plan == agent_plan
+
+def test_get_resource() -> None:  # noqa: D103
+    agent_plan = AgentPlan.from_agent(MyAgent())
+    mock = agent_plan.get_resource("mock", ResourceType.CHAT_MODEL)
+    assert mock.chat() == "8.8.8.8 mock resource just for testing."
diff --git a/python/flink_agents/plan/tests/test_resource_provider.py 
b/python/flink_agents/plan/tests/test_resource_provider.py
new file mode 100644
index 0000000..616cca2
--- /dev/null
+++ b/python/flink_agents/plan/tests/test_resource_provider.py
@@ -0,0 +1,68 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import json
+from pathlib import Path
+
+import pytest
+
+from flink_agents.api.resource import Resource, ResourceType
+from flink_agents.plan.resource_provider import PythonResourceProvider, 
ResourceProvider
+
+current_dir = Path(__file__).parent
+
+
+class MockChatModelImpl(Resource):  # noqa: D101
+    host: str
+    desc: str
+
+    @classmethod
+    def resource_type(cls) -> ResourceType:  # noqa: D102
+        return ResourceType.CHAT_MODEL
+
+
+@pytest.fixture(scope="module")
+def resource_provider() -> ResourceProvider:  # noqa: D103
+    return PythonResourceProvider(
+        name="mock",
+        type=MockChatModelImpl.resource_type(),
+        module=MockChatModelImpl.__module__,
+        clazz=MockChatModelImpl.__name__,
+        kwargs={"host": "8.8.8.8", "desc": "mock chat model"},
+    )
+
+
+def test_python_resource_provider_serialize( # noqa: D103
+    resource_provider: ResourceProvider,
+) -> None:
+    json_value = resource_provider.model_dump_json(serialize_as_any=True)
+    with Path.open(Path(f"{current_dir}/resources/resource_provider.json")) as 
f:
+        expected_json = f.read()
+    actual = json.loads(json_value)
+    expected = json.loads(expected_json)
+    assert actual == expected
+
+
+def test_python_resource_provider_deserialize( # noqa: D103
+    resource_provider: ResourceProvider,
+) -> None:
+    with Path.open(Path(f"{current_dir}/resources/resource_provider.json")) as 
f:
+        expected_json = f.read()
+    expected_resource_provider = PythonResourceProvider.model_validate_json(
+        expected_json
+    )
+    assert resource_provider == expected_resource_provider
diff --git a/python/flink_agents/api/runner_context.py 
b/python/flink_agents/plan/tools/__init__.py
similarity index 66%
copy from python/flink_agents/api/runner_context.py
copy to python/flink_agents/plan/tools/__init__.py
index 05b053d..e154fad 100644
--- a/python/flink_agents/api/runner_context.py
+++ b/python/flink_agents/plan/tools/__init__.py
@@ -15,23 +15,3 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
#################################################################################
-from abc import ABC, abstractmethod
-
-from flink_agents.api.event import Event
-
-
-class RunnerContext(ABC):
-    """Abstract base class providing context for agent execution.
-
-    This context provides access to event handling.
-    """
-
-    @abstractmethod
-    def send_event(self, event: Event) -> None:
-        """Send an event to the agent for processing.
-
-        Parameters
-        ----------
-        event : Event
-            The event to be processed by the agent system.
-        """
diff --git a/python/flink_agents/api/runner_context.py 
b/python/flink_agents/plan/tools/function_tool.py
similarity index 67%
copy from python/flink_agents/api/runner_context.py
copy to python/flink_agents/plan/tools/function_tool.py
index 05b053d..2847b90 100644
--- a/python/flink_agents/api/runner_context.py
+++ b/python/flink_agents/plan/tools/function_tool.py
@@ -15,23 +15,19 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
#################################################################################
-from abc import ABC, abstractmethod
+from typing import Any, Dict, Tuple
 
-from flink_agents.api.event import Event
+from flink_agents.api.tools.tool import BaseTool
+from flink_agents.plan.function import Function
 
 
-class RunnerContext(ABC):
-    """Abstract base class providing context for agent execution.
+#TODO: Complete FunctionTool
+class FunctionTool(BaseTool):
+    """Function tool.
 
-    This context provides access to event handling.
+    Currently, this class is just for testing purposes.
     """
-
-    @abstractmethod
-    def send_event(self, event: Event) -> None:
-        """Send an event to the agent for processing.
-
-        Parameters
-        ----------
-        event : Event
-            The event to be processed by the agent system.
-        """
+    func: Function
+    def call(self, *args: Tuple[Any, ...], **kwargs: Dict[str, Any]) -> Any:
+        """Call function."""
+        return self.func(*args, **kwargs)
diff --git a/python/flink_agents/runtime/flink_runner_context.py 
b/python/flink_agents/runtime/flink_runner_context.py
index 980f70d..001735d 100644
--- a/python/flink_agents/runtime/flink_runner_context.py
+++ b/python/flink_agents/runtime/flink_runner_context.py
@@ -21,7 +21,9 @@ import cloudpickle
 from typing_extensions import override
 
 from flink_agents.api.event import Event
+from flink_agents.api.resource import Resource, ResourceType
 from flink_agents.api.runner_context import RunnerContext
+from flink_agents.plan.agent_plan import AgentPlan
 
 
 class FlinkRunnerContext(RunnerContext):
@@ -30,7 +32,9 @@ class FlinkRunnerContext(RunnerContext):
     This context allows access to event handling.
     """
 
-    def __init__(self, j_runner_context: Any) -> None:
+    __agent_plan: AgentPlan
+
+    def __init__(self, j_runner_context: Any, agent_plan_json: str) -> None:
         """Initialize a flink runner context with the given java runner 
context.
 
         Parameters
@@ -39,6 +43,7 @@ class FlinkRunnerContext(RunnerContext):
             Java runner context used to synchronize data between Python and 
Java.
         """
         self._j_runner_context = j_runner_context
+        self.__agent_plan = AgentPlan.model_validate_json(agent_plan_json)
 
     @override
     def send_event(self, event: Event) -> None:
@@ -49,13 +54,17 @@ class FlinkRunnerContext(RunnerContext):
         event : Event
             The event to be processed by the agent system.
         """
+        class_path = 
f"{event.__class__.__module__}.{event.__class__.__qualname__}"
         try:
-            class_path = 
f"{event.__class__.__module__}.{event.__class__.__qualname__}"
             self._j_runner_context.sendEvent(class_path, 
cloudpickle.dumps(event))
         except Exception as e:
             err_msg = "Failed to send event " + class_path + " to runner 
context"
             raise RuntimeError(err_msg) from e
 
-def create_flink_runner_context(j_runner_context: Any) -> FlinkRunnerContext:
+    @override
+    def get_resource(self, name: str, type: ResourceType) -> Resource:
+        return self.__agent_plan.get_resource(name, type)
+
+def create_flink_runner_context(j_runner_context: Any, agent_plan_json: str) 
-> FlinkRunnerContext:
     """Used to create a FlinkRunnerContext Python object in Pemja 
environment."""
-    return FlinkRunnerContext(j_runner_context)
+    return FlinkRunnerContext(j_runner_context, agent_plan_json)
diff --git a/python/flink_agents/runtime/local_runner.py 
b/python/flink_agents/runtime/local_runner.py
index dfbb582..19bf795 100644
--- a/python/flink_agents/runtime/local_runner.py
+++ b/python/flink_agents/runtime/local_runner.py
@@ -24,6 +24,7 @@ from typing_extensions import override
 
 from flink_agents.api.agent import Agent
 from flink_agents.api.event import Event, InputEvent, OutputEvent
+from flink_agents.api.resource import Resource, ResourceType
 from flink_agents.api.runner_context import RunnerContext
 from flink_agents.plan.agent_plan import AgentPlan
 from flink_agents.runtime.agent_runner import AgentRunner
@@ -91,6 +92,10 @@ class LocalRunnerContext(RunnerContext):
         logger.info("key: %s, send_event: %s", self.__key, event)
         self.events.append(event)
 
+    @override
+    def get_resource(self, name: str, type: ResourceType) -> Resource:
+        return self.__agent_plan.get_resource(name, type)
+
 
 class LocalRunner(AgentRunner):
     """Agent runner implementation for local execution, which is
diff --git a/python/flink_agents/runtime/tests/test_get_resource_in_action.py 
b/python/flink_agents/runtime/tests/test_get_resource_in_action.py
new file mode 100644
index 0000000..89d5cb7
--- /dev/null
+++ b/python/flink_agents/runtime/tests/test_get_resource_in_action.py
@@ -0,0 +1,93 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from typing import Any, Dict, Tuple, Type
+
+from flink_agents.api.agent import Agent
+from flink_agents.api.chat_models.chat_model import BaseChatModel
+from flink_agents.api.decorators import action, chat_model, tool
+from flink_agents.api.event import InputEvent, OutputEvent
+from flink_agents.api.execution_environment import AgentsExecutionEnvironment
+from flink_agents.api.resource import Resource, ResourceType
+from flink_agents.api.runner_context import RunnerContext
+
+
+class MockChatModelImpl(BaseChatModel):  # noqa: D101
+    host: str
+    desc: str
+
+    @classmethod
+    def resource_type(cls) -> ResourceType:  # noqa: D102
+        return ResourceType.CHAT_MODEL
+
+    def chat(self) -> str:
+        """For testing purposes."""
+        return self.host + " " + self.desc
+
+
+class MyAgent(Agent):  # noqa: D101
+    @chat_model
+    @staticmethod
+    def mock_chat_model() -> Tuple[Type[Resource], Dict[str, Any]]:  # noqa: 
D102
+        return MockChatModelImpl, {
+            "name": "mock_chat_model",
+            "host": "8.8.8.8",
+            "desc": "mock chat model just for testing.",
+        }
+
+    @tool
+    @staticmethod
+    def mock_tool(input: str) -> str:  # noqa: D102
+        return input + " mock tools just for testing."
+
+    @action(InputEvent)
+    @staticmethod
+    def mock_action(event: InputEvent, ctx: RunnerContext) -> None:  # noqa: 
D102
+        input = event.input
+        mock_chat_model = ctx.get_resource(
+            type=ResourceType.CHAT_MODEL, name="mock_chat_model"
+        )
+        mock_tool = ctx.get_resource(type=ResourceType.TOOL, name="mock_tool")
+        ctx.send_event(
+            OutputEvent(
+                output=input
+                + " "
+                + mock_chat_model.chat()
+                + " "
+                + mock_tool.call("call")
+            )
+        )
+
+
+def test_get_resource_in_action() -> None:  # noqa: D103
+    env = AgentsExecutionEnvironment.get_execution_environment()
+
+    input_list = []
+    agent = MyAgent()
+
+    output_list = env.from_list(input_list).apply(agent).to_list()
+
+    input_list.append({"key": "bob", "value": "the first message."})
+
+    env.execute()
+
+    assert output_list == [
+        {
+            "bob": "the first message. 8.8.8.8 mock chat model "
+            "just for testing. call mock tools just for testing."
+        }
+    ]
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index c749624..b85cf0b 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.agents.runtime.python.event.PythonEvent;
 import org.apache.flink.agents.runtime.python.utils.PythonActionExecutor;
 import org.apache.flink.agents.runtime.utils.EventUtil;
 import org.apache.flink.python.env.PythonDependencyInfo;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -172,7 +173,10 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
                                     .getTmpDirectories(),
                             new HashMap<>(System.getenv()),
                             getRuntimeContext().getJobInfo().getJobId());
-            pythonActionExecutor = new 
PythonActionExecutor(pythonEnvironmentManager);
+            pythonActionExecutor =
+                    new PythonActionExecutor(
+                            pythonEnvironmentManager,
+                            new ObjectMapper().writeValueAsString(agentPlan));
             pythonActionExecutor.open();
         }
     }
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
index 8a37ea4..b67e637 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
@@ -47,12 +47,14 @@ public class PythonActionExecutor {
     private static final String FLINK_RUNNER_CONTEXT_VAR_NAME = 
"flink_runner_context";
 
     private final PythonEnvironmentManager environmentManager;
+    private final String agentPlanJson;
     private final PythonRunnerContextImpl runnerContext;
 
     private PythonInterpreter interpreter;
 
-    public PythonActionExecutor(PythonEnvironmentManager environmentManager) {
+    public PythonActionExecutor(PythonEnvironmentManager environmentManager, 
String agentPlanJson) {
         this.environmentManager = environmentManager;
+        this.agentPlanJson = agentPlanJson;
         this.runnerContext = new PythonRunnerContextImpl();
     }
 
@@ -65,7 +67,7 @@ public class PythonActionExecutor {
 
         // TODO: remove the set and get runner context after updating pemja to 
version 0.5.3
         Object pythonRunnerContextObject =
-                interpreter.invoke(CREATE_FLINK_RUNNER_CONTEXT, runnerContext);
+                interpreter.invoke(CREATE_FLINK_RUNNER_CONTEXT, runnerContext, 
agentPlanJson);
         interpreter.set(FLINK_RUNNER_CONTEXT_VAR_NAME, 
pythonRunnerContextObject);
     }
 

Reply via email to