GitHub user wenjin272 created a discussion: Comprehensive Memory Management for
Flink-Agents
## Introduce
In the practice of AI agentic system, the focus is shifting from Prompt
Engineering to Context Engineering, especially for long-running and complex
tasks. And memory management is an essential component of Context Engineering.
There are already some proposals about memory in Flink-Agents. In #92,
@coderplay introduced the role and importance of memory, and proposed a
distributed memory system for Flink-Agents. In #40, @xintongsong proposed the
design of short-term memory and @KeGu-069 has already implemented it in
Flink-Agents.
This proposal, based on the previous proposals and the existing development
experience of Flink-Agents, and drawing on practices from other agent
frameworks, try to present a more comprehensive design for memory management in
Flink-Agents.
## Memory Type
Flink-Agents classifies memory types based on **visibility**, **retention** and
information **accuracy**.
* visibility: The scope of the memory, i.e., whether it is visible across keys.
* retention: The lifecycle of the memory, i.e., when it will be cleared.
* accuracy: The completeness and accuracy of information stored in memory,
i.e., whether uses can retrieve the complete original information.
| | **Single-Key** | **Cross-Key** |
| --- | --- | --- |
| **Single Run** | Sensory Memory | Unsupported |
| **Multiple Runs (precise)** | Short-Term Memory | Unsupported
| **Multiple Runs (vague)** | Long-Term Memory | Knowledge |
Here
* Run: An agent run refers to the processing of a single input event.
* Key: In Flink Agents, inputs of the agent are partitioned by their keys. This
corresponds to how data are partitioned by keys in Flink's Keyed DataStream.
### Sensory Memory
* **Use Case**: Store temporary data generated during the agent execution which
is only needed for a single Run. For example, tool call context, or the data
users want to pass to other Agent actions.
* **Characteristics**: Automatically cleaned up by the framework after an Agent
run finished.
### Short-Term Memory
* **Use Case**: Store data generated during the agent execution, but
* compared to Sensory Memory, the lifecycle of stored data can across
multiple Runs.
* compared to Long-Term Memory, the stored data is small in size, can be
stored in Flink State, and requires precise information retrieval.
* **Characteristics**:
* The lifecycle of the stored data can across multiple Runs.
* Small in size and precise information retrieval.
### Long-Term Memory
* **Use Case**: Store data generated during the agent execution, but compared
to Short-Term Memory
* the stored data may expand rapidly as the agent execution, which requires
**compaction** to control the storage usage.
* when retrieving data from a large amount of memory in Long-Term Memory, it
may not be possible to recall complete data, and the data itself may be
compacted.
* **Characteristics**: Support for compaction of stored data; Loss of stored
information; Support for semantic search.
### Knowledge
* **Use Case**: Store fact, experience and skill discoverd, generated or
learned during the Agent execution, as well as some prior knowledge provided by
users. The stored data can be shared by and flink job and agent.
* **Characteristics**: The ownership is independent of the agent and flink job,
so the Flink-Agents framework will not manage the data stored in it; The stored
data is visible to any flink job and agent.
## Sensory Memory
### Data Structure
The data structure of Sensory Memory is same as Short-Term Memory #40, we
provide a more detailed description here.
Sensory Memory store data as key-value pairs, where key is the field name and
only supports string type, value is field content, and supports the following
types:
* primitive types
```python
sensory_memory.set("name", "john")
name: str = sensory_memory.get("name")
```
* list
```python
sensory.set ("nums", [1, 2, 3, 4, 5])
nums: List [Int] = sensory_memory.get("nums")
```
* map
```python
sensory_memory.set("user", {"name": "john", "age": 13})
user: Dict[str, Any] = sensory_memory.get("user")
```
* object
```python
sensory_memory.set("user", User(name="john", age=13))
user: User = sensory_memory.get("user")
```
For Sensory Memory is based on Flink map state, the constraint for types of
object is same as Flink state.
Sensory Memory also supports store nested object as a tree structure.
```python
# store data to memory
user: MemoryObject = sensory_memory.new_object("user")
user.set("name", "jhon")
user.set("age", 13)
# which is equal to
user.set("user.name", "jhon")
user.set("user.age", 13)
# retrieve data from memory
user: MemoryObject = sensory_memory.get_object("user")
name: str = user.get("name")
age: int = user.get("age")
# which is equal to
name: str = sensory_memory.get("user.name")
age: int = sensory_memory.get("user.age")
```
### Auto Clean Up
The Flink-Agents framework is responsible for clearing the Sensory Memory at
the end of the Agent Run.
### Storage
Sensory Memory is based on flink MapState.
## Short-Term Memory
The design of Short-Term Memory is same as the Sensory Memory. The only
difference is Flink-Agents won't clear the stored data in Short-Term Memory
when agent Run finished.
## Long-Term Memory
The major feature of Long-Term Memory is supporting for compaction of stored
data and semantic search.
### Data Structure
Long-Term Memory stores data as named memory set. Each memory set contains a
set of memory items. In the first version, the memory item only supports string
type and built-in `ChatMessage`, and will be extended to any json serializable
object in the future.
* Create
```python
memset: MemorySet = long_term_memory.create_memory_set(
name="chat_history",
type=ChatMessage,
capacity=1000,
capacity_manage_strategy=CapacityManageStrategy.SUMMARIZE,
)
```
When create a memory set in Long-Term Memory, user should specify:
* name: The name of the memory set
* type: The type of the memory item
* capacity: The capacity of the memory set. Flink-Agents will trigger
compaction to avoid the count of memory items exceed the capacity.
* capacity_manage_strategy: The strategy for how to reduce memory items when
compaction triggered.
* Write & Read
```python
memset: MemorySet = long_term_memory.get_memory_set("chat_history")
memset.add(item=ChatMessage(role=, content=))
items: List[MemorySetItem] = memset.get()
```
Here, the `MemorySetItem` contains:
* memory_set_name: The name of the memory set this item belong to.
* item: The value of the memory item.
* compacted: Whether this item has been compacted.
* created_time: Timestamp or timestamp range for when this memory item was
created.
* last_accessed_time: Timestamp for the last time this memory item was
accessed.
* Get Recent N
```python
memset: MemorySet = long_term_memory.get_memory_set("chat_history")
items: List[MemorySetItem] = memset.get_recent(n=10)
```
* Semantic Search
```python
memset: MemorySet = long_term_memory.get_memory_set("chat_history")
items: List[MemorySetItem] = memset.search(query=, limit=)
```
### Storage
Long-Term Memory will be based on vector store.
* In the long run, Long-Term Memory will consist of both local and remote
components. The local data is regarded as hot data, which has better
performance but has space limitation. The remote data is regarded as cold data,
which has poor IO performance but unlimited space.
* In the first version, we will only support store the data in remote vector
store.
### Compaction
Flink-Agents will apply compaction to memory set to reduce memory items when
the size is close to the capacity.
#### Capacity Manage Strategy
* Flink-Agents will provide some built-in strategies, such
* trim: remove the n oldest memory items.
* summarize: use llm to summarize the memory items
* User defined strategy will be supported in the future.
#### Compaction Execution
Because the compaction of Long-Term Memory involves both read and write to the
vector store, it may become a bottleneck for agent execution performance. To
resolve this issue, we can rather execute the compaction asynchronously or use
an individual flink job to maintain the Long-Term Memory.
* Async Execution: The compaction will be executed inside the agent job. When
user add item to the memory set, framework will check the free space and may
submit an async compaction task.
* Dedicated Compaction Job: The compaction will be executed outside the agent
job. The compaction job will monitor the Long-Term Memory and execute
compaction at the appropriate time.
In the first version, we will only support async execution.
### Per Action Consistency
Flink-Agents provides per action consistency guarantees. For state-based
Sensory Memory and Short-Term Memory, Flink-Agents uses wal to record updates
to the state and reapplies these updates during recover. For Long-Term Memory,
since updates are directly written to the external system (in the first
version), these updates should be skipped during replay.
In the current implementation of per action consistency, Flink-Agents skips
completed actions during recovery, but partially executed actions are
re-executed from the beginning. Therefore, to ensure exactly-once semantics for
Long-Term Memory, further designs such as action-granularity two-phase commit
mechanism are needed.
However, Flink-Agents is also considering modifying the existing per-action
consistency implementation to achieve more finer-grained consistency
guarantees, therefore, the consistency issues of Long-Term Memory will not be
addressed in the first version.
## Knowledge
### Data Structure
Similar to Long-Term Memory, Knowledge stores data as stores data as named
knowledge set. Each knowledge set contains a set of knowledge items. The
knowledge item can be any json serializable object.
```python
knowledge_set: KnowledgeSet = knowledge.create_knowledge_set(name="profile")
knowledge_set.add(item={"name": "john", "interest": "table tennis"})
profiles: List[KnowledgeSetItem] = knowledge_set.get("profile")
related_profiles: List[KnowledgeSetItem] = knowledge_set.search(query=, limit=)
```
Here, the `KnowledgeSetItem` contains:
* name: The name of the knowledge set this item belong to.
* value: The content of this knowledge item.
* created_time: Timestamp for when this item was created.
* updated_time: Timestamp for when this item was updated.
### Storage
Knowledge is stored in a vector store independent of flink job.
## Compared to LangGraph
LangGraph provides Short-Term Memory and Long-Term Memory:
* Short-term memory, or thread-scoped memory, tracks the ongoing conversation
by maintaining message history within a session. LangGraph manages short-term
memory as a part of your agent’s state. State is persisted to a database using
a checkpointer so the thread can be resumed at any time. Short-term memory
updates when the graph is invoked or a step is completed, and the State is read
at the start of each step.
* Long-term memory stores user-specific or application-level data across
sessions and is shared across conversational threads. It can be recalled at any
time and in any thread. Memories are scoped to any custom namespace, not just
within a single thread ID. LangGraph provides stores (reference doc) to let you
save and recall long-term memories.
Compared to LangGraph, the memory of Flink-Agents can provide similar
functionality and has additional features.
* Sensory Memory and Short-Term Memory of Flink-Agents are similar to
Short-Term Memory of LangGraph, and achieves persistence and fault tolerance
based on Flink checkpoint mechanism. Besides, Sensory Memory also supports auto
clean up after each Run.
* Long-Term Memory of Flink-Agents is special. The visibility is similar to
Short-Term Memory of LangGraph, but the retrieval is similar to Long-Term
Memory of LangGraph. Besides, Long-Term Memory of Flink-Agents supports
capacity management.
* Knowledge of Flink-Agents is similar to Long-Term Memory of LangGraph.
GitHub link: https://github.com/apache/flink-agents/discussions/339
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]