GitHub user coderplay created a discussion: Flink-Agents Memory Design
## Introduction Memory is fundamental to human intelligence—it shapes identity, guides decisions, and enables learning, adaptation, and meaningful relationships. In communication, memory allows us to recall past interactions, infer preferences, and maintain coherent, context-rich exchanges over long periods. In contrast, current AI agents powered by large language models (LLMs) are limited by fixed context windows and lack persistent memory, leading to forgetfulness, contradictions, and a diminished user experience. Even as LLMs’ context windows grow, they cannot match the human ability to retain and retrieve relevant information across sessions and topics. This limitation is especially problematic in domains requiring continuity and trust, such as healthcare and education. To overcome these challenges, AI agents need robust memory systems that can selectively store, consolidate, and retrieve important information—mirroring human cognition. Such systems will enable AI agents to maintain consis tent personas, track evolving user preferences, and build upon prior exchanges, transforming them into reliable, long-term collaborators. ## Proposal To address the critical limitations of current AI agents—lack of persistent memory, poor long-term relationship building—we propose a distributed memory system for flink-agents that mirrors human cognitive processes. The solution implements three memory types: **Sensory Memory** (internal events), **Short-term Memory** (recent history), and **Long-term Memory** (semantic retrieval). It leverages Flink's distributed state management with a hybrid backend combining history store and vector store, directly addressing LLM context window limitations through persistent, searchable memory. Additionally, **Knowledge** provides shared long-term memory accessible across all agent instances, stored externally without Flink checkpointing, enabling domain-specific expertise and consistent collaboration in trust-critical domains. ## Memory ### High-Level Architecture The Flink-Agents Memory System introduces a **hybrid state backend** that combines two specialized state backends: - **History Store**: Tracks all memory operations for audit trails, and handles retrieval for short-term memories. The default implementation of the history store uses Flink’s build-in state backends. - **Vector Store**: Provides vector search capabilities for long-term semantic memory retrievals. The default implementation is based on embedded Lucene instances. This hybrid approach optimizes both storage efficiency and search performance, unlike traditional single-backend solutions. ```mermaid graph LR subgraph "Flink Cluster" subgraph "Task Manager 1" A1[Agent Instance 1] M1[Memory 1] subgraph "Hybrid State Backend 1" S1_R[History Store] S1_L[Vector Store] end end subgraph "Task Manager 2" A2[Agent Instance 2] M2[Memory 2] subgraph "Hybrid State Backend 2" S2_R[History Store] S2_L[Vector Store] end end subgraph "Task Manager N" AN[Agent Instance N] MN[Memory N] subgraph "Hybrid State Backend N" SN_R[History Store] SN_L[Vector Store] end end end A1 --> M1 A2 --> M2 AN --> MN M1 --> S1_R M1 --> S1_L M2 --> S2_R M2 --> S2_L MN --> SN_R MN --> SN_L style S1_R fill:#ffebee style S1_L fill:#e8f5e8 style S2_R fill:#ffebee style S2_L fill:#e8f5e8 style SN_R fill:#ffebee style SN_L fill:#e8f5e8 ``` ### Memory Types with Hybrid Storage The system supports different types of memories, each with specialized storage in our **hybrid state backend**. **Sensory memory operates internally and is not exposed to users**: ```mermaid graph TD subgraph "Memory Types" SM["Sensory Memory<br/>Agent Events<br/>MapState<br/>Internal Only"] STM["Short-term Memory<br/>Recent History<br/>History Storage<br/>get_history(n)"] LTM["Long-term Memory<br/>Semantic Search<br/>Vector Storage<br/>search(query)"] end subgraph "Hybrid State Backend" HB_R["History Store"] HB_L["Vector Store"] end STM -.-> HB_R LTM -.-> HB_L style SM fill:#e1f5fe style STM fill:#f3e5f5 style LTM fill:#e8f5e8 style HB_R fill:#ffebee style HB_L fill:#e8f5e8 ``` ### Memory Types and Implementation The system supports three distinct types of memories, each serving different purposes and using different storage mechanisms: #### Sensory Memory (Event Processing) Sensory memory captures real-time events from agents and stores them in Flink's keyed state using `MapState`. This represents the immediate sensory input that agents receive from their environment. **Sensory memory is completely invisible to users** and operates automatically in the background. **Characteristics:** - **Storage**: Flink MapState (in-memory + checkpointed) - **Retention**: Configurable window size - **Access Pattern**: Lookup - **Performance**: Nano-seconds latency - **Visibility**: Internal only - no user API access #### Short-term Memory (Recent History) Short-term memory maintains a configurable history of recent agent interactions and experiences. It uses history storage for persistence and provides fast access to recent memories. **Characteristics:** - **Storage**: History storage (local to each TaskManager) - **Retention**: Persistent, configurable TTL - **Access Pattern**: Sequential, indexed by time - **Performance**: Sub-millisecond latency #### Long-term Memory (Semantic Search) Long-term memory provides semantic search capabilities using a vector storage backend. It stores memories with embeddings for similarity-based retrieval. **Characteristics:** - **Storage**: Vector storage backend - **Retention**: Persistent, configurable TTL - **Access Pattern**: Semantic search, similarity-based - **Performance**: Optimized for search operations ### Memory API Design The Memory API provides a unified interface for different types of memories. **Sensory memory is not exposed through the API** and operates automatically in the background. Memory inherits from Flink's State API. It behaves like other Flink KeyedState implementations and can be saved to StateBackends and checkpointed for fault tolerance and recovery. ```python @dataclass class MemoryItem(BaseModel): id: str = Field(..., description="The unique identifier for the text data") memory: str = Field( ..., description="The memory deduced from the text data" ) metadata: Optional[Dict[str, Any]] = Field(None, description="Additional metadata for the text data") created_at: Optional[str] = Field(None, description="The timestamp when the memory was created") updated_at: Optional[str] = Field(None, description="The timestamp when the memory was updated") class MemoryAPI(ABC): """ Core memory interface for Flink agents. This API provides access to different types of memories: - Short-term memory: Recent history via get_history(n) method - Long-term memory: Semantic search via search() method Note: Sensory memory is internal and not exposed through this API. """ @abstractmethod def add(self, memory: MemoryItem) -> str: """Store a memory item""" pass @abstractmethod def update(self, memory_id: str, data: MemoryItem) -> MemoryItem: """Update a memory by ID""" pass @abstractmethod def delete(self, memory_id: str) -> None: """Delete a memory by ID""" pass @abstractmethod def get(self, memory_id: str) -> Optional[MemoryItem]: """Retrieve a memory by ID""" pass @abstractmethod def get_history(self, n: int = 10) -> List[MemoryItem]: """Get last n memories""" pass @abstractmethod def search(self, query: str, limit: int = 10) -> List[MemoryItem]: """Search memories by semantic similarity""" pass ``` ## Knowledge Knowledge is a special type of long-term memory that is shared across all Agent instances. It provides access to external, pre-built knowledge sources through remote vector database connections, offering domain-specific knowledge that can enhance agent responses. Unlike individual agent memories, knowledge is globally accessible and persistent across the entire Flink cluster. Since knowledge is independent from a particular agent, it does not require Flink state checkpointing. ### Knowledge Architecture ```mermaid graph TB subgraph "Flink Cluster" subgraph "Agent Instances" A1[Agent 1] A2[Agent 2] AN[Agent N] end end subgraph "External Knowledge Store" KB1[Knowledge Base 1<br/>Domain A] KB2[Knowledge Base 2<br/>Domain B] KB3[Knowledge Base N<br/>Domain N] end A1 -.-> KB1 A1 -.-> KB2 A1 -.-> KB3 A2 -.-> KB1 A2 -.-> KB2 A2 -.-> KB3 AN -.-> KB1 AN -.-> KB2 AN -.-> KB3 style KB1 fill:#fff3e0 style KB2 fill:#fff3e0 style KB3 fill:#fff3e0 ``` ### Knowledge Base Implementation ```python class KnowledgeBase: """ Shared knowledge base integration. Provides access to pre-built knowledge sources through remote vector database connections. This is a special type of long-term memory that is shared across all agent instances in the cluster. """ def __init__(self, service_url: str, api_key: str = None): self.service_url = service_url self.api_key = api_key self.client = VectorDBClient(service_url, api_key) def search(self, query: str, knowledge_source: str = None, limit: int = 10) -> List[KnowledgeItem]: """ Search external knowledge base. Args: query: Search query string knowledge_source: Specific knowledge source to search (optional) limit: Maximum number of results to return Returns: List of knowledge items with relevance scores """ response = self.client.search( query=query, source=knowledge_source, limit=limit ) return [KnowledgeItem.from_response(item) for item in response.results] def get_sources(self) -> List[str]: """Get available knowledge sources""" return self.client.list_sources() def get_source_info(self, source_name: str) -> Dict[str, Any]: """Get information about a specific knowledge source""" return self.client.get_source_info(source_name) class KnowledgeItem: """Represents a knowledge item from external sources""" def __init__(self, content: str, source: str, score: float, metadata: Dict[str, Any] = None): self.content = content self.source = source self.score = score self.metadata = metadata or {} ``` ## Memory Compaction Memory compaction addresses the fundamental limitation of Large Language Models (LLMs): their fixed context window constraints. While the Memory API can store vast amounts of memories limited only by disk capacity and storage limits, LLMs have context window limitations ranging from several thousand to several million tokens. ### Compaction Strategies Memory compaction reduces memory volume while preserving essential information through two main approaches: #### 1. Scoring-Based Compaction Scoring-based compaction evaluates the relevance and importance of each memory item using mechanisms such as recency, frequency of access, semantic similarity to the current query, and user-defined importance markers. By ranking memories based on these criteria, the system retains only the most significant ones. #### 2. Summarization-Based Compaction Summarization-based compaction uses LLMs to create intelligent summaries of memory groups. This approach groups similar memories together and generates concise summaries that capture essential information from multiple related memories. ### Integration The compaction functionality is integrated into the Agent API to provide seamless compaction capabilities. This integration ensures that agents can efficiently access relevant memory information without exceeding context window limitations, resulting in optimized context window usage, faster LLM inference, and reduced token costs. ## Data Flow ### Memory Addition Flow ```mermaid sequenceDiagram participant Agent participant Memory participant LLM participant Embeddings participant VectorStore participant HistoryStore participant MemoryCompactor Agent->>Memory: add(messages, agent_id) Memory->>LLM: extract_facts(messages) LLM-->>Memory: extracted_facts[] loop For each fact Memory->>Embeddings: embed(fact) Embeddings-->>Memory: vector Memory->>VectorStore: search_similar(vector) VectorStore-->>Memory: existing_memories[] end Memory->>LLM: update_memory_decisions(facts, existing) LLM-->>Memory: actions[ADD/UPDATE/DELETE] loop For each action alt ADD Memory->>VectorStore: insert(vector, metadata) Memory->>HistoryStore: add_history(memory_id, null, new_memory, "ADD") else UPDATE Memory->>VectorStore: update(memory_id, vector, metadata) Memory->>HistoryStore: add_history(memory_id, old_memory, new_memory, "UPDATE") else DELETE Memory->>VectorStore: delete(memory_id) Memory->>HistoryStore: add_history(memory_id, old_memory, null, "DELETE") end end Memory-->>Agent: memory_results[] alt triggers_compaction Agent->>MemoryCompactor: compact() MemoryCompactor-->>Agent: compaction_complete end ``` ### Memory Search Flow ```mermaid sequenceDiagram participant Agent participant Memory participant Embeddings participant VectorStore Agent->>Memory: search(query) Memory->>Embeddings: embed(query) Embeddings-->>Memory: query_vector Memory->>VectorStore: search(query_vector, filters) VectorStore-->>Memory: similar_memories[] Memory-->>Agent: search_results[] ``` ## Integration in Agents ```python class AgentWithMemory(Agent): """Flink agent with shared memory and knowledge integration""" def __init__(self, agent_id: str, knowledge_bases: List[KnowledgeBase] = None): super().__init__(agent_id) self.knowledge_bases = knowledge_bases or [] def search_knowledge(self, query: str, source: str = None, limit: int = 10) -> List[KnowledgeItem]: """ Search across all available shared knowledge bases. Args: query: Search query source: Specific knowledge source (optional) limit: Maximum results per source Returns: Combined results from all shared knowledge bases """ all_results = [] for kb in self.knowledge_bases: if source is None or source == kb.get_source_info().get('name'): results = kb.search(query, limit=limit) all_results.extend(results) # Sort by relevance score all_results.sort(key=lambda x: x.score, reverse=True) return all_results[:limit] def process_event(self, event: AgentEvent): """Process event with memory and knowledge integration""" # Get memory context recent_history = self.memory.get_history(5) relevant_memories = self.memory.search(event.content, limit=10) compacted_context = self.memory_compactor.get() # Get knowledge context knowledge_context = self.search_knowledge(event.content, limit=5) # Generate response with compacted context response = self.generate_response( event, recent_history, relevant_memories, compacted_context, knowledge_context ) # Store interaction interaction_memory = MemoryItem( id=f"interaction_{time.time()}", memory=f"Event: {event.content}, Response: {response}", metadata={ "timestamp": time.time(), "agent_id": self.agent_id, }, created_at=str(time.time()) ) self.memory.store(interaction_memory) if(triggers_compaction): this.memory_compactor.compact() return response ``` ## Execution Plan The implementation will be executed in three phases: ### Phase 1: Core Memory Foundation - **Short-term Memory**: History-based storage using MapState/ListState - **Memory Compaction**: Scoring and summarization strategies - **Knowledge Integration**: External knowledge base connections - **Architecture**: Memory delegates to MapState/ListState (no Flink State inheritance) - **Long-term Memory**: Replaced by compacted memory results ### Phase 2: Flink State Integration - **Memory State Inheritance**: Memory inherits from Flink's State API - **State Backend Integration**: Full integration with Flink state backends - **Checkpointing Support**: Fault tolerance through Flink checkpointing ### Phase 3: Hybrid State Backend - **Hybrid State Backend**: History Store and Vector Store. ## Conclusion The Flink-Agents Memory System introduces a **revolutionary hybrid state backend** that combines RocksDB/ForSt and Lucene instances within a unified state management framework. This dual-engine approach eliminates the need for external vector databases while providing optimal performance for both historical data storage and semantic search operations. ### Key Innovations 1. **Hybrid State Backend**: The core innovation combining RocksDB/ForSt instances for historical data and Lucene instances for vector search 2. **Unified Interface**: Single API that automatically routes operations to the appropriate storage engine 3. **Distributed Architecture**: Each TaskManager hosts its own hybrid backend with isolated RocksDB/ForSt and Lucene instances 4. **Coordinated Checkpointing**: Ensures consistent state recovery across both storage engines This design provides a solid foundation for building sophisticated, memory-aware agents in Apache Flink environments, with the hybrid state backend serving as the cornerstone of the architecture. GitHub link: https://github.com/apache/flink-agents/discussions/92 ---- This is an automatically sent email for issues@flink.apache.org. To unsubscribe, please send an email to: issues-unsubscr...@flink.apache.org