GitHub user xintongsong closed a discussion: MVP Design Doc - (WIP, incrementally updating)
# Overview  The above figure shows the overall idea about how we expect users to use Flink Agents. 1. Flink Agents provides APIs in various programming languages for users to define agents. a. We start with Python and Java APIs, and may support more languages in future. b. APIs in different programming languages should share a common set of concepts and primitives. For a certain language, we may only support a subset of the concepts and primitives. E.g., with declarative languages like Yaml, we may only support the ReAct style agent but not the workflow style agent. c. Users may even use multiple languages. E.g., build an agent in Java that includes a custom action defined in Python. (Not necessarily supported in the MVP.) 2. During building the agents, users may leverage built-in or 3rd-party libraries to integrate their agents with a rich ecosystem. a. This is similar to Flink's connectors or filesystems. The framework defines the standard interfaces for common building blocks of agents (LLMs, tools, vector stores, etc.), of which different pluggable implementations can be provided to integrate with various systems and providers. 3. Once the agents are defined, we need to connect them with inputs and outputs. a. An agent can either be part of a Flink job that consumes from an input datastream and generates an output datastream, or a standalone service that accepts REST/RPC requests and replies with responses (not necessarily supported in the MVP). b. How we connect agents with inputs / outputs (using DataStream or Table API, in Java or Python, or using SQL), should be independent from how we define the agents (using Java or Python Agent API). 4. The agents can be executed in various runtimes. a. A local python runtime is helpful for easy trying out and debugging, without relying on a Flink cluster, Docker / Kubernetes environment, or even JVM. The limitation is that it cannot support agents implemented in Java, using datastreams as inputs and outputs, nor Flink's state management and fault tolerance. b. A Flink (StateFun) runtime for running the agent as a Flink Job, in a distributed manner and with Flink's state management and fault tolerant supports. # Core Concepts This section discusses the core concepts and primitives for defining an agent. We divide the concepts into 2 categories: - **Fundamental Concepts:** The minimum set of fundamental and essential concepts and primitives needed for building an agent. These are also the only concepts that the runtime needs to understand and deal with. Keeping the fundamental concepts concise would be helpful for maintaining a loose coupling between the core and runtime layers, and achieve consistent behaviors across different runtimes. - **Extended Concepts:** Concepts and primitives built on top of the fundamental concepts. These include convenient built-in implementations (e.g., ReAct Agent) and interfaces for integrations (e.g., LLM, Tool). Regardless of which Agent API / programming language is used, a user-defined agent will be translated into a declarative, language-independent Agent Plan, which will be further executed by different runtimes. ## Fundamental Concepts - **Agent:** A agent consists of a set of actions, interacting with each other through events. - A ReAct style agent can be described as a special agent, like the following: - Start with a reasoning action that invokes LLM to decide what to do next. - Perform various action actions (e.g., calling tools) according to the reasoning result. - After performing actions, always go back to the reasoning action. - Loops until the reasoning action decides to end the agent. - By introducing a built-in action that executes another agent, we can build hierarchical agents. - **Action:** An action is a piece of code that can be executed. Each action listens to at least one type of event. When an event of the listening type occurs, the action will be triggered. An action can also generate new events, to trigger other actions. - **Event:** Events are messages passed between actions. Events may carry payloads. A single event may trigger multiple actions if they are all listening to its type. - There are 2 special types of events - InputEvent: Generated by the framework, carrying an input data record that arrives at the agent. Actions listening to the InputEvent will be the entry points of agent. - OutputEvent: The framework will listen to OutputEvent, and convert its payload into outputs of the agent. By generating OutputEvent, actions can emit output data. - **Memory:** Data that will be remembered across multiple runs of the agent. - **Short-Term Memory:** Data that will be shared within a thread, across multiple runs and all actions of the agent. We can leverage Flink's keyed state to provide exactly-once consistency for the short-term memory. - **Long-Term Memory:** Data that will be shared across multiple threads. Users need to specify an external storage. - **Thread:** A thread is a sequence of related agent runs. E.g., a conversation between a user and a chatbot may consist of multiple rounds of messages. While each message from the user may trigger a new run of the agent, the entire conversation composes a thread and shares the short-term memory. A thread is like a keyed partition in Flink, where the thread-id corresponds to the key. - **Resource & Resource Provider:** A resource can be anything that is defined in user codes, referenced by its type and name, and used in the agent. A resource provider is a piece of code to get the resource in runtime. Introducing this concept helps making extended concepts like LLM, prompt, and tool transparent to the runtime. From the perspective of the runtime, they are just resources with different types. - **Callable:** This is an internal concept and should not be exposed to users. It represents a piece of code, containing necessary information for executing the codes. This allows us to deal with actions and resource providers regardless of the programming language, until we need to actually execute them. ## Extended Concepts We propose to introduce the following most commonly needed extended concepts for the MVP. More concepts can be added in future if needed. - Model (Resource) - ChatModel - EmbedModel - Prompt (Resource) - Prompt from Python functions / Java methods - Prompt from MCP Server - Tool (Resource) - Python functions / Java methods as a tools - Tools from MCP Server - Tools from other ecosystems: LangChain, LlamaIndex, etc. (Not necessarily supported for the MVP). - Vector Store (Resource) - ReAct Agent ### How to build extended concepts on top of fundamental concepts? Taking ChatModel as an example, we'll have the following built-in implementations: - **ChatModel** - A new resource type, and an integration interface. - **ChatModelRequest** - An event, carrying the name of the target ChatModel resource, and the input messages - **ChatModelResponse** - An event, carrying the model responses - **ChatModelAction** - An action that: - listens to the ChatModelRequest event - get the ChatModel resource from the target resource provider - calls the ChatModel with the input messages - generate the ChatModelResponse event with the output messages To use this feature, users need to: - Implement the ChatModel interface for the model provider they want to connect. - This can be skipped if there's already a built-in / 3rd-party implementation. - Define a Resource Provider in the agent that initiates and returns the ChatModel. - Send a ChatModelRequest to invoke the chat model. - Listen to ChatModelResponse for the model responses. - Note: Users do not need to define a ChatModelAction in the agent. All the built-in actions should be automatically included. # API - Short-Term Memory Design: #40 - Resource & Resource Provider Design: #41 # Runtime We propose to have 2 runtimes: - **Flink Runtime:** This should be the default runtime for production usages, with full functionality supports: multi-language, distributed execution, state management and fault tolerance, integration with Flink's data processing, etc. - **Local Runtime:** This is a lightweight, single-process runtime that can easily run on your local machine, or event in IDE. It's mainly for trying-out, testing purposes. It only supports agents implemented completely in Python, and does not support Flink's datastream as input and output. ## Flink Runtime ### Execution  The above figure shows how an agent is executed in the Flink Runtime. The main components are: - **Dispatcher**, implemented in Java, responsible for dispatching events, scheduling actions for execution, and managing states. This is practically the Function Dispatcher of Statefun customized for Flink Agents. - **Python / Java Executor**, implemented in Python / Java respectively, responsible for executing actions that are written in the corresponding language. These are Functions in Statefun. An agent run works as follows: 1. When a new record is received, the dispatcher will emit an InputEvent, with the incoming record as payload. 2. The dispatcher will look into the agent plan, find all actions that listen to the event type. It will also read the corresponding state from the state backend. 3. Then the dispatcher sends the state and event to the executors, and triggers the actions for execution. a. During the execution of actions, we may need to access resources. For resources implemented in the same language, we can directly invoke the resource provider. For resources implemented in another language, we use[Pemja](https://github.com/alibaba/pemja) for cross-language object access. 4. Once an action is finished, the executor will send the updated state and new emitted events back to the dispatcher. The updated state will be written back to the state backend. a. If the event is an OutputEvent, the dispatcher will emit its payload to the output datastream. b. For other events, the dispatcher will loop back to 2). ### Deployment The whole Flink Runtime is like a large custom operator (to be precise, a topology of multiple operators) to Flink. - From the users' perspective, the Flink Agents framework is a dependency of their jobs. Flink already has mature support for adding Java / Python dependencies, making sure they are accessible in the distributed execution environment. - From Flink's perspective, it doesn't need to understand these operators, and they are no different from any other custom operators. That means we can deploy the agents just like how we deploy any other Flink jobs, as a Kubernetes / Yarn application or as a job submitted to a Flink standalone / session cluster. ``` output_datastream = ( FlinkAgents.from_datastream(input_datastream) .apply(my_agent) .to_datastream() ) ``` During the job compiling, the above code snippet will wrap the user-defined agent into the Flink Runtime operators, append them to the input datastream, and returns an output datastream for adding more downstream operators. Here **job compiling** refers to the process of translating the user codes into a Flink StreamGraph / JobGraph. During the task initialization, the Flink Runtime operators will bring up the dispatcher, java executor and python executor. For the python executor, we can either launch a Python process, or run it in the JVM with [Pemja](https://github.com/alibaba/pemja). See the [Process and Thread modes](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/python/python_execution_mode/) of Flink Python UDF for more details. ## Local Runtime Omitted as the design of Local Runtime is quite straightforward. # Project Structure ``` flink-agents ├── python │ ├── flink_agents │ │ ├── __init__.py │ │ ├── api │ │ │ ├── __init__.py │ │ │ ├── ... │ │ │ └── tests │ │ ├── examples │ │ ├── integrations │ │ ├── plan │ │ └── runtime │ ├── pyproject.toml │ └── requirements │ ├── api │ ├──── src │ │ ├── main │ │ └── test │ └── pom.xml ├── examples ├── integrations ├── plan ├── runtime └── pom.xml ``` GitHub link: https://github.com/apache/flink-agents/discussions/9 ---- This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
