GitHub user wenjin272 created a discussion: Comprehensive Memory Management for 
Flink-Agents

## Introduce
In the practice of AI agentic system, the focus is shifting from Prompt 
Engineering to Context Engineering, especially for long-running and complex 
tasks. And memory management is an essential component of Context Engineering.

There are already some proposals about memory in Flink-Agents. In #92, 
@coderplay introduced the role and importance of memory, and proposed a 
distributed memory system for Flink-Agents. In #40, @xintongsong proposed the 
design of short-term memory and @KeGu-069 has already implemented it in 
Flink-Agents. 

This proposal, based on the previous proposals and the existing development 
experience of Flink-Agents, and drawing on practices from other agent 
frameworks, try to present a more comprehensive design for memory management in 
Flink-Agents.

## Memory Type
Flink-Agents classifies memory types based on **visibility**, **retention** and 
information **accuracy**.
* visibility: The scope of the memory, i.e., whether it is visible across keys.
* retention: The lifecycle of the memory, i.e., when it will be cleared.
* accuracy: The completeness and accuracy of information stored in memory, 
i.e., whether uses can retrieve the complete original information.


|  | **Single-Key** | **Cross-Key** |
| --- | --- | --- |
| **Single Run** | Sensory Memory | Unsupported |
| **Multiple Runs (precise)** | Short-Term Memory | Unsupported
| **Multiple Runs (vague)** | Long-Term Memory | Knowledge |

Here
* Run: An agent run refers to the processing of a single input event.
* Key: In Flink Agents, inputs of the agent are partitioned by their keys. This 
corresponds to how data are partitioned by keys in Flink's Keyed DataStream.

### Sensory Memory
* **Use Case**: Store temporary data generated during the agent execution which 
is only needed for a single Run. For example, tool call context, or the data 
users want to pass to other Agent actions.
* **Characteristics**: Automatically cleaned up by the framework after an Agent 
run finished.

### Short-Term Memory
* **Use Case**: Store data generated during the agent execution, but 
  * compared to Sensory Memory, the lifecycle of stored data can across 
multiple Runs.
  * compared to Long-Term Memory, the stored data is small in size, can be 
stored in Flink State, and requires precise information retrieval.
* **Characteristics**:
  * The lifecycle of the stored data can across multiple Runs.
  * Small in size and precise information retrieval.

### Long-Term Memory
* **Use Case**: Store data generated during the agent execution, but compared 
to Short-Term Memory
  * the stored data may expand rapidly as the agent execution, which requires 
**compaction** to control the storage usage.
  * when retrieving data from a large amount of memory in Long-Term Memory, it 
may not be possible to recall complete data, and the data itself may be 
compacted.
* **Characteristics**: Support for compaction of stored data; Loss of stored 
information; Support for semantic search. 

### Knowledge
* **Use Case**: Store fact, experience and skill discoverd, generated or 
learned during the Agent execution, as well as some prior knowledge provided by 
users. The stored data can be shared by and flink job and agent.
* **Characteristics**: The ownership is independent of the agent and flink job, 
so the Flink-Agents framework will not manage the data stored in it; The stored 
data is visible to any flink job and agent.

## Sensory Memory
### Data Structure
The data structure of Sensory Memory is same as Short-Term Memory #40, we 
provide a more detailed description here.

Sensory Memory store data as key-value pairs, where key is the field name and 
only supports string type, value is field content, and supports the following 
types:
* primitive types
    ```python
    sensory_memory.set("name", "john")
    name: str = sensory_memory.get("name")
    ```
* list
  ```python
  sensory.set ("nums", [1, 2, 3, 4, 5])
  nums: List [Int] = sensory_memory.get("nums")
  ```
* map
  ```python
  sensory_memory.set("user", {"name": "john", "age": 13})
  user: Dict[str, Any] = sensory_memory.get("user")
  ```
* object
  ```python
  sensory_memory.set("user", User(name="john", age=13))
  user: User = sensory_memory.get("user")
  ```
  For Sensory Memory is based on Flink map state, the constraint for types of 
object is same as Flink state.

Sensory Memory also supports store nested object as a tree structure.
```python
# store data to memory
user: MemoryObject = sensory_memory.new_object("user")
user.set("name", "jhon")
user.set("age", 13)
# which is equal to
user.set("user.name", "jhon")
user.set("user.age", 13)

# retrieve data from memory
user: MemoryObject = sensory_memory.get_object("user")
name: str = user.get("name")
age: int = user.get("age")
# which is equal to 
name: str = sensory_memory.get("user.name")
age: int = sensory_memory.get("user.age")
```
### Auto Clean Up
The Flink-Agents framework is responsible for clearing the Sensory Memory at 
the end of the Agent Run.

### Storage
Sensory Memory is based on flink MapState.

## Short-Term Memory
The design of Short-Term Memory is same as the Sensory Memory. The only 
difference is Flink-Agents won't clear the stored data in Short-Term Memory 
when agent Run finished.

## Long-Term Memory
The major feature of Long-Term Memory is supporting for compaction of stored 
data and semantic search.

### Data Structure
Long-Term Memory stores data as named memory set. Each memory set contains a 
set of memory items. In the first version, the memory item only supports string 
type and built-in `ChatMessage`, and will be extended to any json serializable 
object in the future.

* Create
  ```python
  memset: MemorySet = long_term_memory.create_memory_set(
      name="chat_history",
      type=ChatMessage,
      capacity=1000,
      capacity_manage_strategy=CapacityManageStrategy.SUMMARIZE,
  )
  ```
  When create a memory set in Long-Term Memory, user should specify:
  * name: The name of the memory set
  * type: The type of the memory item
  * capacity: The capacity of the memory set. Flink-Agents will trigger 
compaction to avoid the count of memory items exceed the capacity.
  * capacity_manage_strategy: The strategy for how to reduce memory items when 
compaction triggered.

* Write & Read 
  ```python
  memset: MemorySet = long_term_memory.get_memory_set("chat_history")
  memset.add(item=ChatMessage(role=, content=))

  items: List[MemorySetItem] = memset.get()
  ```

  Here, the `MemorySetItem` contains:
  * memory_set_name: The name of the memory set this item belong to.
  * item: The value of the memory item.
  * compacted: Whether this item has been compacted.
  * created_time: Timestamp or timestamp range for when this memory item was 
created.
  * last_accessed_time: Timestamp for the last time this memory item was 
accessed.

* Get Recent N
  ```python
  memset: MemorySet = long_term_memory.get_memory_set("chat_history")
  items: List[MemorySetItem] = memset.get_recent(n=10)
  ```
* Semantic Search
  ```python
  memset: MemorySet = long_term_memory.get_memory_set("chat_history")
  items: List[MemorySetItem] = memset.search(query=, limit=)
  ```

### Storage
Long-Term Memory will be based on vector store.
* In the long run, Long-Term Memory will consist of both local and remote 
components. The local data is regarded as hot data, which has better 
performance but has space limitation. The remote data is regarded as cold data, 
which has poor IO performance but unlimited space.
* In the first version, we will only support store the data in remote vector 
store.

### Compaction
Flink-Agents will apply compaction to memory set to reduce memory items when 
the size is close to the capacity.

#### Capacity Manage Strategy
* Flink-Agents will provide some built-in strategies, such
  * trim: remove the n oldest memory items.
  * summarize: use llm to summarize the memory items
* User defined strategy will be supported in the future.

#### Compaction Execution
Because the compaction of Long-Term Memory involves both read and write to the 
vector store, it may become a bottleneck for agent execution performance. To 
resolve this issue, we can rather execute the compaction asynchronously or use 
an individual flink job to maintain the Long-Term Memory.

* Async Execution: The compaction will be executed inside the agent job. When 
user add item to the memory set, framework will check the free space and may 
submit an async compaction task.
* Dedicated Compaction Job: The compaction will be executed outside the agent 
job. The compaction job will monitor the Long-Term Memory and execute 
compaction at the appropriate time.

In the first version, we will only support async execution.

### Per Action Consistency

Flink-Agents provides per action consistency guarantees. For state-based 
Sensory Memory and Short-Term Memory, Flink-Agents uses wal to record updates 
to the state and reapplies these updates during recover. For Long-Term Memory, 
since updates are directly written to the external system (in the first 
version), these updates should be skipped during replay.

In the current implementation of per action consistency, Flink-Agents skips 
completed actions during recovery, but partially executed actions are 
re-executed from the beginning. Therefore, to ensure exactly-once semantics for 
Long-Term Memory, further designs such as action-granularity two-phase commit 
mechanism are needed.

However, Flink-Agents is also considering modifying the existing per-action 
consistency implementation to achieve more finer-grained consistency 
guarantees, therefore, the consistency issues of Long-Term Memory will not be 
addressed in the first version.

## Knowledge

### Data Structure
Similar to Long-Term Memory, Knowledge stores data as stores data as named 
knowledge set. Each knowledge set contains a set of knowledge items. The 
knowledge item can be any json serializable object.

```python
knowledge_set: KnowledgeSet = knowledge.create_knowledge_set(name="profile")

knowledge_set.add(item={"name": "john", "interest": "table tennis"})

profiles: List[KnowledgeSetItem] = knowledge_set.get("profile")

related_profiles: List[KnowledgeSetItem] = knowledge_set.search(query=, limit=)
```

Here, the `KnowledgeSetItem` contains:
* name: The name of the knowledge set this item belong to.
* value: The content of this knowledge item.
* created_time: Timestamp for when this item was created.
* updated_time: Timestamp for when this item was updated.
  
### Storage
Knowledge is stored in a vector store independent of flink job.

## Compared to LangGraph
LangGraph provides Short-Term Memory and Long-Term Memory:
* Short-term memory, or thread-scoped memory, tracks the ongoing conversation 
by maintaining message history within a session. LangGraph manages short-term 
memory as a part of your agent’s state. State is persisted to a database using 
a checkpointer so the thread can be resumed at any time. Short-term memory 
updates when the graph is invoked or a step is completed, and the State is read 
at the start of each step.
* Long-term memory stores user-specific or application-level data across 
sessions and is shared across conversational threads. It can be recalled at any 
time and in any thread. Memories are scoped to any custom namespace, not just 
within a single thread ID. LangGraph provides stores (reference doc) to let you 
save and recall long-term memories.

Compared to LangGraph, the memory of Flink-Agents can provide similar 
functionality and has additional features.
* Sensory Memory and Short-Term Memory of Flink-Agents are similar to 
Short-Term Memory of LangGraph, and achieves persistence and fault tolerance 
based on Flink checkpoint mechanism. Besides, Sensory Memory also supports auto 
clean up after each Run.
* Long-Term Memory of Flink-Agents is special. The visibility is similar to 
Short-Term Memory of LangGraph, but the retrieval is similar to Long-Term 
Memory of LangGraph. Besides, Long-Term Memory of Flink-Agents supports 
capacity management.
* Knowledge of Flink-Agents is similar to Long-Term Memory of LangGraph.






GitHub link: https://github.com/apache/flink-agents/discussions/339

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to