GitHub user wenjin272 closed a discussion: Resource & Resource Provider Design

## 1. Introduce

This doc will introduce Resource & Resource Provider, which is an unified 
abstraction in Flink-Agents for all kinds of resources such as models, tools, 
vector_stores and so on. This abstraction can make the Runtime of Flink-Agents 
not need be aware of the difference among various resources, and supports lazy 
initialization and reuse of resources.

Sections 2, 3, and 4 will cover:
- The concept and interface of Resource & Resource Provider
- Example about how to defined  specific Resource & Resource Provider, and use 
them in an agent.
- Explanation of how does Resource & Resource Provider work internally, 
includes compile stage and runtime stage.
## 2. Concept & Interface
### Resource
Resource  is the abstraction of object defined in user codes, like model, tools 
and so on. It 
- provides unified type declaration for various built-in or user-defined 
resource
- represented as resource with different types in runtime

```

class ResourceType(Enum):
    chat_model = "chat_model"
    embedding = "embedding_model"
    prompt = "prompt"
    tool = "tool"
    vector_store = "vector_store"
    mcp_server = "mcp_server"
```


```
class Resource:
    """Base interface for user defined resources, like model, prompt, tool."""
    name: str
    type: ResourceType
```
### Resource Provider 
Resource Provider is the factory to create Resource object, it
- supports lazy initialization and reuse of Resource object.
- should be serializable to support use in other process.
- represented as resource provider provides specific type resource in runtime
```
class ResourceProvider(BaseModel, ABC):
    """Resource provider carries resource meta to construct
     Resource object in runtime.
     
     Attributes:
    ----------
    name : str
        The name of the resource
    module : str
        The module name of the resource.
    clazz : str
        The class name of the resource.
    kwargs : Dict[str, Any]
        The initialization arguments of the resource.
    """
    name: str
    module: str
    clazz: str
    kwargs: Dict[str, Any]

    @abstractmethod
    def provide(self) -> Resource:
        module = importlib.import_module(self.module)
        cls = getattr(module, self.clazz)
        return cls(**self.kwargs)
```

## 3. API
This section will introduce how can user define their own Resource by 
implements Resource interface, and decalare a ResourceProvider in agent to use 
it.
Taking chat_model as an example.
- Flink-Agents will provide the ChatModel interface, which implements Resource 
to decalare it is Resource, and provides chat model specific interface
```
class ChatModel(Resource, ABC):
    """Abstract base class defining the interface for chat models."""

    tools: List[str] = []

    @abstractmethod
    def chat(self, messages: List[ChatMessage]) -> ChatMessage | List[ToolCall]:

    @abstractmethod
    def bind_tools(self, tools: List[ToolDescriptor]) -> None:
```
        
- User can implement ChatModel interface to defined own chat model
```
class UserDefinedChatModel(ChatModel):
    model: str
    host: str

    def chat(self, messages: List[ChatMessage]) -> ChatMessage | List[ToolCall]:
      """user defined logic"""

    def bind_tools(self, tools: List[ToolDescriptor]) -> None:
      """user defined logic"""
```
        
- User can add ResourceProvider in agent to use Resource in actions.
```
class MockChatAgent(Agent):
  
    @model
    @staticmethod
    def llm() -> Tuple[Type[ChatModel], Dict[str, Any]]:
        return UserDefinedChatModel, 
               {'model': 'qwen2.5:7b', 
                'host': 'http://localhost:9999'}
```
-
    - The `@model` decorator indicates that the `llm` method defines a model 
resource provider.
    - The method returns a factory class and a dictionary of arguments, which 
the framework will use to create the model resource.
    - The method name llm will be the name of ResourceProvider and Resource.
   
- User can get Resource instance by name and type in action, and use it.

```
def call_chat_model(
    event: ChatModelEvent | ToolEvent, state: State, ctx: RunnerContext
) -> None:
  
  model = ctx.get_resource(event.model, ResourceType.chat_model)
  response = model.chat(event.messages)
  ctx.send_event(ChatModelResponseEvent(request=event, response=response))
```
- we can provide some syntactic sugar like get_model, get_tool to improve 
usability

## 4. How does Resource & Resource Provider work internally

This section will introduce how does Flink-Agents framework use Resource & 
Resource Provider, includes:

-   
    - In compile stage, convert user defined factory function in agent to 
Resource Provider object in agent plan.
    - In runtime stage, get resource provider from agent plan by name and 
resource type
    - In runtime stage, create resource by resource provider and supports reuse 
in Context wise.

- Convert factory function defined in agent to a two-level dict 
'type->name->ResourceProvider' in agent plan.
```
class AgentPlan(BaseModel):
    """Agent plan managing actions, models, and tools."""

    actions: Dict[str, action]
    resource_providers: Dict[ResourceType, Dict[str, ResourceProvider]]
    actions_by_event: Dict[Type[Event], List[Action]]
    resources: Dict[ResourceType, Dict[str, Resource]]
```
- There will be an agent Runner Context to manage resources, events, state in 
agent execution. The agent plan will be an property of Context.
- In action, user will get resource from Context, Context will return resource 
from agent plan.
```
class RunnerContextImpl(RunnerContext):
    __agent_plan: AgentPlan
    __id: UUID
    state: State
    events: deque[Event]
    
    """get resource according to name and type."""
    def get_resource(self, name: str, resource_type: ResourceType) -> Resource:
        return self.__agent_plan.get_resource(name, resource_type)
```
- agent plan will return cached resource directly or crate  resource object by 
resource provider and return it.
```
class AgentPlan(BaseModel):
    """Agent plan managing actions, models, and tools."""

    actions: Dict[str, action]
    resource_providers: Dict[ResourceType, Dict[str, ResourceProvider]]
    actions_by_event: Dict[Type[Event], List[Action]]
    resources: Dict[ResourceType, Dict[str, Resource]]

    def get_resource(name: str, resource_type: ResourceType) -> Resource:
        if name not in self.resources[resource_type][name]:
            resource_provider = self.__agent.get_resource_provider(name, 
resource_type)
            self.resources[resource_type][name] = resource_provider.provide()
        return self.resources[resource_type][name]
```

## 5. An implementation of Resource & Resource Provider for special resource

For some kinds of resources, user may prefer define resource directly in Agent, 
rather than define a factory function. 
Take tool as an example, user defines a python function, and the function 
itself is a Resource. Require user to define a factory function which returns 
the tool function is unreasonable.
```
class MockChatAgent(Agent):
  
    @tool
    @staticmethod
    def add(a: int, b: int) -> int:
        print(f"calling `add`: ({a}, {b})")
        return a + b
```
For these kinds of resources, we provide SerializableResource & 
SerializableResourceProvider interface
### SerializableResource
SerializableResource object should be serializable
```
class SerializableResource(Resource, BaseModel):
    """Resource which is serializable"""
```
### SerializableResourceProvider
SerializableResourceProvider will return the SerializableResource object 
directly.
```
class SerializableResourceProvider(ResourceProvider):
    """Resource Provider which persist Resource Object.
    Attributes:
    ----------
    module : str
        module of the resource class.
    classname : str
        class name of the resource class
    serialized : Dict[str, Any]
        serilaized resource object
    resource : Optional[SerializableResource]
        SerializableResource object
    """
    
    module: str
    classname: str
    serialized: Dict[str, Any]
    resource: Optional[SerializableResource] = None

    def provide(self) -> Resource:
        if self.resource is None:
            module = importlib.import_module(self.module)
            clazz = getattr(module, self.classname)
            self.resource = clazz(**self.serialized)
        return self.resource
```
## 6. Resource Access Cross Language

This section will introduce the use case which may access resource cross 
language, and talk about the brief thought of how to solve it. In our early 
stage, we don't need implement this.

### What is Resource Access Cross Language
Flink-Agents will provide two API, python and java, and user may mix it to 
reuse some action or resource defined previous. And this may lead
- Use Java Resource object in Python Action
- Use Python Resource object in Java Action
![image](https://github.com/user-attachments/assets/14d716fe-3532-4644-9911-11c5a97c3d9d)

### How to solve this problem (overall)
- Use pemja 
    - pemja (https://github.com/alibaba/pemja) is a cross language call 
framework based on FFI, and is used by PyFlink. It supports Java call Python, 
and Java call Python call Java.  Because our dispatcher is running in JVM, so 
pemja can satisfy our requirement.
- The python API and java API should provide identical interface for each kind 
of Resource, includes ChatModel, Tool, VectorStore and so on.
- And, for each kind of Resource, there will be an correspond Wrapper class in 
Python and Java. Take ChatModel as an example.
    - python wrapper class
```
from pemja import findClass
JavaChatModelClass = findClass("flink.agents.api.ChatModel")
class JavaChatModel(ChatModel):
    tools: List[str] = []
    chat_model: JavaChatModelClass

    def __init__(self, clazz: Str, *args) -> None:
        """
        Args:
          clazz: fully qualified name of java resource
          args: init parameters of java resource
        """
        JavaChatModelImplementationClass = findClass(clazz);
        self.chat_modle=JavaChatModelImplementationClass(*args);

    def chat(self, messages: List[ChatMessage]) -> ChatMessage | List[ToolCall]:
        return self.chat_model.chat(messages)

    @abstractmethod
    def bind_tools(self, tools: List[ToolDescriptor]) -> None:
        self.chat_model.bindTools(tools)
```
- 
    - java wrapper class
```
import pemja.core.object.PyObject;
public class PyChatModel extends ChatModel {
    private final PythonInterpreter interpreter
    private final String module;
    private final String clazz;
    private PyObject chatModel;
    
    public PyChatModel(PythonInterpreter interpreter, 
                       String module, 
                       String clazz,
                       Map<String, Object> kwargs) {
        this.module = module;
        this.clazz = clazz;
        this.interpreter = interpreter;
        interpreter.exec(String.format("import %s", module));
        this.chatModel = interpreter.invoke(String.format("%s.__init__"), 
kwargs)
    }
    
    public ChatMessage chat(List<ChatMessage> messages) {
        return this.chatModel.invokeMethod("chat", messages)
    }

    public void bindTools(List<ToolDescriptor> tools) {
        this.chatModel.invokeMethod("bind_tools", tools)
    }
}
```
### How to use other language defined resource in Agent
User may already implements some resource in Python or Java, and want reuse it 
when create and Agent use Java or Python API
Take define Java ChatModel in Python API as an example
- User declare a java ChatModel in Agent just like declare a python ChatModel. 
The difference is
    - The ChatModel class must be JavaChatModel
    - The arguments dict must contain the Java ChatModel fully qualified name
```
class MockChatAgent(Agent):
    @model
    @staticmethod
    def llm() -> Tuple[Type[ChatModel], Dict[str, Any]]:
    return JavaChatModel, 
               {'clazz': 'org.apache.flink.agents.example.UserDefinedChatModel'
                'model': 'qwen2.5:7b', 
                'host': 'http://localhost:9999'}
```
  
- There will be no difference in compile stage between using Python Resource 
and Java Resource.
- When get this resource in action, according to the actually executor (Pyhon 
or Java) of action , there will be two case
    - execute an python action: we need create python JavaChatModel object 
according to the info in ResourceProvider 
    - execute an java action: we create java ChatModel object directly 
according to the info in ResourceProvider

Overall, we just store the cross language resource meta info and init arguments 
in compile stage, and decide whether need to create Wrapper class in runtime 
stage.

GitHub link: https://github.com/apache/flink-agents/discussions/41

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to