GitHub user letaoj edited a discussion: Replay-based Per Action State Consistency
# Background Flink Agents execute various actions during record processing, including model inference and tool invocation. Model inference involves calling LLMs for reasoning, classification, or generation tasks, often through expensive API calls to external providers. Tool invocation allows agents to interact with external systems through UDFs with network access, with native support for Model Context Protocol (MCP). These actions enable agents to perform contextual searches, execute business logic, interact with enterprise systems, and invoke specialized processing services. # Problem ## Side Effects and Costs from Action Replay While Flink provides exactly-once processing guarantees for stream processing on a per message basis, agent actions create challenges around side effects, costs, and recovery semantics. Both model inference and tool invocation can produce effects that persist beyond the agent's execution context or incur significant costs that should not be duplicated. The core problem occurs when: 1. A Flink agent processes a record and executes multiple actions (model calls, tool calls) 2. Some actions complete successfully, potentially modifying external state or consuming costly resources 3. The agent crashes before completing record processing 4. Upon recovery, Flink reprocesses the same record, repeating all actions This creates several issues: * duplicate side effects in external systems (e.g. sending the same email multiple times to the same recipient) * unnecessary costs from repeated expensive model inference calls * consistency violations from mixed successful and repeated actions * resource waste from redundant operations * observability challenges when debugging multiple executions of the same logical operation. ## Non-Deterministic Model Outputs A critical additional challenge is that model inference and generation is inherently non-deterministic. Repeating model calls multiple times with identical inputs may result in different outputs due to sampling, temperature settings, or model provider variations. This creates severe consistency problems when model outputs drive downstream decisions such as reasoning chains or tool selection. Consider this scenario: an agent makes a model call that decides to invoke Tool A, but crashes before completion. Upon recovery, the same model call with identical inputs may decide to invoke Tool B instead. This leaves the system in an inconsistent state where Tool A was already executed based on the first decision, but the agent now wants to execute Tool B based on the second decision. The best approach is to ensure the model never makes the same decision twice - the original model output should be preserved and reused during recovery. Flink's streaming architecture introduces additional complexity through continuous processing on unbounded streams, distributed state management, back-pressure from action failures, and a semantic gap where exactly-once guarantees don't extend to external model providers or tool endpoints. # Goals and Non-Goals ## Goals * **Durability of Agent State (short-term memory)**: Ensure that the state can be recovered when the agent crashes. * **State Recovery**: Provide a mechanism to recover the agent's short-term memory by replaying the action history from the state store. * **Minimal Performance Overhead**: The solution should have minimal impact on the performance of the Flink job during normal operation. * **Pluggable Architecture**: The design should be flexible enough to support different types of storage systems. ## Non-Goals * **Long-Term Memory**: This design focuses on recovering the short-term memory of the current task. It does not address the problem of long-term memory or knowledge persistence. * **Database Management**: This design assumes that the external storage system is already set up and managed. It does not cover the details of database administration. # High-Level Design ## Execution Flow for Static Agent ```mermaid sequenceDiagram participant A as Agent participant M as Short-term Memory participant SS as State Store A ->> SS: Check if the key exists or not opt key exists SS ->> A: saved state A --> M: rebuild from saved state (action to task action state map) end A ->> A: receive Events (Chat Model/Tool/Prompt) A ->> Chat Model/Tool/Prompt: take action(s) A ->> A: check in-memory map to see if the key (message key + hash of action) exists or not alt not exists A ->> SS: persist the input and a snapshot of the short-term memory A ->> Chat Model/Tool/Prompt: send request Chat Model/Tool/Prompt ->> A: receive response A ->> SS: persist the output A ->> A: process response opt modify short-term memory A ->> SS: persist the modification(s) A ->> M: update short-term memory end A ->> A: generate the output event(s) A ->> SS: persist the output event(s) else exists A ->> A: get the task action state from memory A ->> M: apply short term memory modification end A ->> A: send output event ``` >From the above diagram, the save state will evolve like below - `<message_key>-<action_hash_1>: {"request": request}` - `<message_key>-<action_hash_1>: {"request": request, "response": response}` - `<message_key>-<action_hash_1>: {"request": request, "response": response, "short-term-memory-updates": [...]}` - `<message_key>-<action_hash_1>: {"request": request, "response": response, "short-term-memory-updates": [...], "output_event": [output_event]}` - `<message_key>-<action_hash_2>: ...` - `<message_key>-<action_hash_2>: ...` - `<message_key>-<action_hash_2>: ...` - `<message_key>-<action_hash_2>: ...` - `<message_key>-<action_hash_3>: ...` ## APIs ### Task Action State ```python @dataclasses class TaskActionState: """ Data class to capture the TaskAction state Attributes ---------- request: str The request string associated with the task action. response: str The response generated by the agent. memory_updates: List[MemoryUpdate] List of memory updates for recovery after crash. output_events: List[Event] The output events produced by the agent. """ request: str response: str memory_updates: List[MemoryUpdate] output_events: List[Event] ``` ### Memory Updates ```python @dataclasses class MemoryUpdate: """ Class to capture all the memory updates so that it can be used to recover the short-term memory after agent crash Attributes ---------- path: str The path in memory where the update occurred. value: Any The value that was updated at the specified path. """ path: str value: Any ``` ### Action Result Store Action result store is an abstract layer to the external database which handles the serialization/deserialization from/to AgentState ```python class TaskActionStore(ABC): """ Abstraction layer to the external database """ @abstractmethod def put(self, key: str, action: Action, value: TaskActionState): """ Persist the object into the external database Parameters ----------- key: str The key of the agent consist of the message key action: Action The action the agent is now taking value: TaskActionState The TaskActionState associate with one action for that message key """ def get(self, key: str, action: Action): -> TaskActionState: """ Get the TaskActionState for a message key and action Parameters: ---------- key: str The key of the message suffixed with the unique id of an action Returns: -------- TaskActionState The TaskActionState associate with the key. """ def list(self, key: str) -> List<TaskActionState>: """ List all the TaskActionState associate with the given message key Parameters: ----------- key: str The key of the message Returns: -------- List<TaskActionState> List of all the task action states """ ``` ## External Database ### Database Consideration Below are some characters of the agent state to consider when picking the right external DB: * Low QPS - Agent state updates typically exhibit lower Queries Per Second (QPS) due to the prolonged response times from the LLM model and tool invocation. Consequently, the storage system only needs to accommodate low QPS. * Durability - The agent state is crucial for action recovery following runtime failures and for debugging purposes. Therefore, the storage system must ensure persistence. * Strong Consistency - To facilitate rapid recovery from the state store after a crash, the agent state must maintain strong consistency. * High Data Volume - Given that both model and tool responses will be stored in the state store, the data size can become substantial. Thus, the state store should be capable of managing data at the megabyte level in extreme scenarios, such as retrieving emails from an email server. * Ranged Get - During recovery, we will use a ranged get the get based on the key generated by the agent, so that the external database need to support ranged get. ### Data Retention To prevent unbounded growth in backend storage, we need to implement a data retention policy since this data is only required for failure recovery. Once a Flink checkpoint is successfully committed, we should automatically delete all data that precedes that checkpoint, ensuring storage remains manageable while maintaining the necessary recovery capabilities. This can be achieved by listen and act on `notifyCheckpointComplete` sent by flink after each checkpoint. ### Viable solution A practical solution for the external database is to use Kafka. All the state will be written to Kafka as a separate message. The per-partition offset will be recorded in the flink state. During recovery, Flink agent will get the latest offset information from the latest checkpoint and reading from the offset until the end to recover the task action state. There are a couple of drawbacks of using Kafka as the data store * Data retention: Kafka does not allow explicitly message removal, only through retention period setting. So we will need to setup a reasonable retention period to keep the state data at a lower-level * Partition management: In the events of rescale of the Flink agent, since Kafka's partition number is fixed, so there could be a chance that two Flink agents reading from the same partition causing read amplification. However, consume from Kafka is only during recover, so with extra latency, it would still be acceptable. GitHub link: https://github.com/apache/flink-agents/discussions/108 ---- This is an automatically sent email for issues@flink.apache.org. To unsubscribe, please send an email to: issues-unsubscr...@flink.apache.org