GitHub user wenjin272 closed a discussion: Resource & Resource Provider Design
## 1. Introduce
This doc will introduce Resource & Resource Provider, which is an unified
abstraction in Flink-Agents for all kinds of resources such as models, tools,
vector_stores and so on. This abstraction can make the Runtime of Flink-Agents
not need be aware of the difference among various resources, and supports lazy
initialization and reuse of resources.
Sections 2, 3, and 4 will cover:
- The concept and interface of Resource & Resource Provider
- Example about how to defined specific Resource & Resource Provider, and use
them in an agent.
- Explanation of how does Resource & Resource Provider work internally,
includes compile stage and runtime stage.
## 2. Concept & Interface
### Resource
Resource is the abstraction of object defined in user codes, like model, tools
and so on. It
- provides unified type declaration for various built-in or user-defined
resource
- represented as resource with different types in runtime
```
class ResourceType(Enum):
chat_model = "chat_model"
embedding = "embedding_model"
prompt = "prompt"
tool = "tool"
vector_store = "vector_store"
mcp_server = "mcp_server"
```
```
class Resource:
"""Base interface for user defined resources, like model, prompt, tool."""
name: str
type: ResourceType
```
### Resource Provider
Resource Provider is the factory to create Resource object, it
- supports lazy initialization and reuse of Resource object.
- should be serializable to support use in other process.
- represented as resource provider provides specific type resource in runtime
```
class ResourceProvider(BaseModel, ABC):
"""Resource provider carries resource meta to construct
Resource object in runtime.
Attributes:
----------
name : str
The name of the resource
module : str
The module name of the resource.
clazz : str
The class name of the resource.
kwargs : Dict[str, Any]
The initialization arguments of the resource.
"""
name: str
module: str
clazz: str
kwargs: Dict[str, Any]
@abstractmethod
def provide(self) -> Resource:
module = importlib.import_module(self.module)
cls = getattr(module, self.clazz)
return cls(**self.kwargs)
```
## 3. API
This section will introduce how can user define their own Resource by
implements Resource interface, and decalare a ResourceProvider in agent to use
it.
Taking chat_model as an example.
- Flink-Agents will provide the ChatModel interface, which implements Resource
to decalare it is Resource, and provides chat model specific interface
```
class ChatModel(Resource, ABC):
"""Abstract base class defining the interface for chat models."""
tools: List[str] = []
@abstractmethod
def chat(self, messages: List[ChatMessage]) -> ChatMessage | List[ToolCall]:
@abstractmethod
def bind_tools(self, tools: List[ToolDescriptor]) -> None:
```
- User can implement ChatModel interface to defined own chat model
```
class UserDefinedChatModel(ChatModel):
model: str
host: str
def chat(self, messages: List[ChatMessage]) -> ChatMessage | List[ToolCall]:
"""user defined logic"""
def bind_tools(self, tools: List[ToolDescriptor]) -> None:
"""user defined logic"""
```
- User can add ResourceProvider in agent to use Resource in actions.
```
class MockChatAgent(Agent):
@model
@staticmethod
def llm() -> Tuple[Type[ChatModel], Dict[str, Any]]:
return UserDefinedChatModel,
{'model': 'qwen2.5:7b',
'host': 'http://localhost:9999'}
```
-
- The `@model` decorator indicates that the `llm` method defines a model
resource provider.
- The method returns a factory class and a dictionary of arguments, which
the framework will use to create the model resource.
- The method name llm will be the name of ResourceProvider and Resource.
- User can get Resource instance by name and type in action, and use it.
```
def call_chat_model(
event: ChatModelEvent | ToolEvent, state: State, ctx: RunnerContext
) -> None:
model = ctx.get_resource(event.model, ResourceType.chat_model)
response = model.chat(event.messages)
ctx.send_event(ChatModelResponseEvent(request=event, response=response))
```
- we can provide some syntactic sugar like get_model, get_tool to improve
usability
## 4. How does Resource & Resource Provider work internally
This section will introduce how does Flink-Agents framework use Resource &
Resource Provider, includes:
-
- In compile stage, convert user defined factory function in agent to
Resource Provider object in agent plan.
- In runtime stage, get resource provider from agent plan by name and
resource type
- In runtime stage, create resource by resource provider and supports reuse
in Context wise.
- Convert factory function defined in agent to a two-level dict
'type->name->ResourceProvider' in agent plan.
```
class AgentPlan(BaseModel):
"""Agent plan managing actions, models, and tools."""
actions: Dict[str, action]
resource_providers: Dict[ResourceType, Dict[str, ResourceProvider]]
actions_by_event: Dict[Type[Event], List[Action]]
resources: Dict[ResourceType, Dict[str, Resource]]
```
- There will be an agent Runner Context to manage resources, events, state in
agent execution. The agent plan will be an property of Context.
- In action, user will get resource from Context, Context will return resource
from agent plan.
```
class RunnerContextImpl(RunnerContext):
__agent_plan: AgentPlan
__id: UUID
state: State
events: deque[Event]
"""get resource according to name and type."""
def get_resource(self, name: str, resource_type: ResourceType) -> Resource:
return self.__agent_plan.get_resource(name, resource_type)
```
- agent plan will return cached resource directly or crate resource object by
resource provider and return it.
```
class AgentPlan(BaseModel):
"""Agent plan managing actions, models, and tools."""
actions: Dict[str, action]
resource_providers: Dict[ResourceType, Dict[str, ResourceProvider]]
actions_by_event: Dict[Type[Event], List[Action]]
resources: Dict[ResourceType, Dict[str, Resource]]
def get_resource(name: str, resource_type: ResourceType) -> Resource:
if name not in self.resources[resource_type][name]:
resource_provider = self.__agent.get_resource_provider(name,
resource_type)
self.resources[resource_type][name] = resource_provider.provide()
return self.resources[resource_type][name]
```
## 5. An implementation of Resource & Resource Provider for special resource
For some kinds of resources, user may prefer define resource directly in Agent,
rather than define a factory function.
Take tool as an example, user defines a python function, and the function
itself is a Resource. Require user to define a factory function which returns
the tool function is unreasonable.
```
class MockChatAgent(Agent):
@tool
@staticmethod
def add(a: int, b: int) -> int:
print(f"calling `add`: ({a}, {b})")
return a + b
```
For these kinds of resources, we provide SerializableResource &
SerializableResourceProvider interface
### SerializableResource
SerializableResource object should be serializable
```
class SerializableResource(Resource, BaseModel):
"""Resource which is serializable"""
```
### SerializableResourceProvider
SerializableResourceProvider will return the SerializableResource object
directly.
```
class SerializableResourceProvider(ResourceProvider):
"""Resource Provider which persist Resource Object.
Attributes:
----------
module : str
module of the resource class.
classname : str
class name of the resource class
serialized : Dict[str, Any]
serilaized resource object
resource : Optional[SerializableResource]
SerializableResource object
"""
module: str
classname: str
serialized: Dict[str, Any]
resource: Optional[SerializableResource] = None
def provide(self) -> Resource:
if self.resource is None:
module = importlib.import_module(self.module)
clazz = getattr(module, self.classname)
self.resource = clazz(**self.serialized)
return self.resource
```
## 6. Resource Access Cross Language
This section will introduce the use case which may access resource cross
language, and talk about the brief thought of how to solve it. In our early
stage, we don't need implement this.
### What is Resource Access Cross Language
Flink-Agents will provide two API, python and java, and user may mix it to
reuse some action or resource defined previous. And this may lead
- Use Java Resource object in Python Action
- Use Python Resource object in Java Action

### How to solve this problem (overall)
- Use pemja
- pemja (https://github.com/alibaba/pemja) is a cross language call
framework based on FFI, and is used by PyFlink. It supports Java call Python,
and Java call Python call Java. Because our dispatcher is running in JVM, so
pemja can satisfy our requirement.
- The python API and java API should provide identical interface for each kind
of Resource, includes ChatModel, Tool, VectorStore and so on.
- And, for each kind of Resource, there will be an correspond Wrapper class in
Python and Java. Take ChatModel as an example.
- python wrapper class
```
from pemja import findClass
JavaChatModelClass = findClass("flink.agents.api.ChatModel")
class JavaChatModel(ChatModel):
tools: List[str] = []
chat_model: JavaChatModelClass
def __init__(self, clazz: Str, *args) -> None:
"""
Args:
clazz: fully qualified name of java resource
args: init parameters of java resource
"""
JavaChatModelImplementationClass = findClass(clazz);
self.chat_modle=JavaChatModelImplementationClass(*args);
def chat(self, messages: List[ChatMessage]) -> ChatMessage | List[ToolCall]:
return self.chat_model.chat(messages)
@abstractmethod
def bind_tools(self, tools: List[ToolDescriptor]) -> None:
self.chat_model.bindTools(tools)
```
-
- java wrapper class
```
import pemja.core.object.PyObject;
public class PyChatModel extends ChatModel {
private final PythonInterpreter interpreter
private final String module;
private final String clazz;
private PyObject chatModel;
public PyChatModel(PythonInterpreter interpreter,
String module,
String clazz,
Map<String, Object> kwargs) {
this.module = module;
this.clazz = clazz;
this.interpreter = interpreter;
interpreter.exec(String.format("import %s", module));
this.chatModel = interpreter.invoke(String.format("%s.__init__"),
kwargs)
}
public ChatMessage chat(List<ChatMessage> messages) {
return this.chatModel.invokeMethod("chat", messages)
}
public void bindTools(List<ToolDescriptor> tools) {
this.chatModel.invokeMethod("bind_tools", tools)
}
}
```
### How to use other language defined resource in Agent
User may already implements some resource in Python or Java, and want reuse it
when create and Agent use Java or Python API
Take define Java ChatModel in Python API as an example
- User declare a java ChatModel in Agent just like declare a python ChatModel.
The difference is
- The ChatModel class must be JavaChatModel
- The arguments dict must contain the Java ChatModel fully qualified name
```
class MockChatAgent(Agent):
@model
@staticmethod
def llm() -> Tuple[Type[ChatModel], Dict[str, Any]]:
return JavaChatModel,
{'clazz': 'org.apache.flink.agents.example.UserDefinedChatModel'
'model': 'qwen2.5:7b',
'host': 'http://localhost:9999'}
```
- There will be no difference in compile stage between using Python Resource
and Java Resource.
- When get this resource in action, according to the actually executor (Pyhon
or Java) of action , there will be two case
- execute an python action: we need create python JavaChatModel object
according to the info in ResourceProvider
- execute an java action: we create java ChatModel object directly
according to the info in ResourceProvider
Overall, we just store the cross language resource meta info and init arguments
in compile stage, and decide whether need to create Wrapper class in runtime
stage.
GitHub link: https://github.com/apache/flink-agents/discussions/41
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]