GitHub user coderplay created a discussion: Flink-Agents Memory Design

## Introduction

Memory is fundamental to human intelligence—it shapes identity, guides 
decisions, and enables learning, adaptation, and meaningful relationships. In 
communication, memory allows us to recall past interactions, infer preferences, 
and maintain coherent, context-rich exchanges over long periods. In contrast, 
current AI agents powered by large language models (LLMs) are limited by fixed 
context windows and lack persistent memory, leading to forgetfulness, 
contradictions, and a diminished user experience. Even as LLMs’ context windows 
grow, they cannot match the human ability to retain and retrieve relevant 
information across sessions and topics. This limitation is especially 
problematic in domains requiring continuity and trust, such as healthcare and 
education. To overcome these challenges, AI agents need robust memory systems 
that can selectively store, consolidate, and retrieve important 
information—mirroring human cognition. Such systems will enable AI agents to 
maintain consis
 tent personas, track evolving user preferences, and build upon prior 
exchanges, transforming them into reliable, long-term collaborators.

## Proposal

To address the critical limitations of current AI agents—lack of persistent 
memory, poor long-term relationship building—we propose a distributed memory 
system for flink-agents that mirrors human cognitive processes.

The solution implements three memory types: **Sensory Memory** (internal 
events), **Short-term Memory** (recent history), and **Long-term Memory** 
(semantic retrieval). It leverages Flink's distributed state management with a 
hybrid backend combining history store and vector store, directly addressing 
LLM context window limitations through persistent, searchable memory.

Additionally, **Knowledge** provides shared long-term memory accessible across 
all agent instances, stored externally without Flink checkpointing, enabling 
domain-specific expertise and consistent collaboration in trust-critical 
domains.

## Memory

### High-Level Architecture

The Flink-Agents Memory System introduces a **hybrid state backend** that 
combines two specialized state backends:

- **History Store**:  Tracks all memory operations for audit trails, and 
handles retrieval for short-term memories. The default implementation of the 
history store uses Flink’s build-in state backends.
- **Vector Store**: Provides vector search capabilities for long-term semantic 
memory retrievals. The default implementation is based on embedded Lucene 
instances.

This hybrid approach optimizes both storage efficiency and search performance, 
unlike traditional single-backend solutions.

```mermaid
graph LR
    subgraph "Flink Cluster"
        subgraph "Task Manager 1"
            A1[Agent Instance 1]
            M1[Memory 1]
            subgraph "Hybrid State Backend 1"
                S1_R[History Store]
                S1_L[Vector Store]
            end
        end
        
        subgraph "Task Manager 2"
            A2[Agent Instance 2]
            M2[Memory 2]
            subgraph "Hybrid State Backend 2"
                S2_R[History Store]
                S2_L[Vector Store]
            end
        end
        
        subgraph "Task Manager N"
            AN[Agent Instance N]
            MN[Memory N]
            subgraph "Hybrid State Backend N"
                SN_R[History Store]
                SN_L[Vector Store]
            end
        end
    end
    
    A1 --> M1
    A2 --> M2
    AN --> MN
    
    M1 --> S1_R
    M1 --> S1_L
    M2 --> S2_R
    M2 --> S2_L
    MN --> SN_R
    MN --> SN_L
    
    style S1_R fill:#ffebee
    style S1_L fill:#e8f5e8
    style S2_R fill:#ffebee
    style S2_L fill:#e8f5e8
    style SN_R fill:#ffebee
    style SN_L fill:#e8f5e8
```

### Memory Types with Hybrid Storage

The system supports different types of memories, each with specialized storage 
in our **hybrid state backend**. **Sensory memory operates internally and is 
not exposed to users**:

```mermaid
graph TD
    subgraph "Memory Types"
        SM["Sensory Memory<br/>Agent Events<br/>MapState<br/>Internal Only"]
        STM["Short-term Memory<br/>Recent History<br/>History 
Storage<br/>get_history(n)"]
        LTM["Long-term Memory<br/>Semantic Search<br/>Vector 
Storage<br/>search(query)"]
    end
    
    subgraph "Hybrid State Backend"
        HB_R["History Store"]
        HB_L["Vector Store"]
    end
    
    STM -.-> HB_R
    LTM -.-> HB_L
    
    style SM fill:#e1f5fe
    style STM fill:#f3e5f5
    style LTM fill:#e8f5e8
    style HB_R fill:#ffebee
    style HB_L fill:#e8f5e8
```

### Memory Types and Implementation

The system supports three distinct types of memories, each serving different 
purposes and using different storage mechanisms:

#### Sensory Memory (Event Processing)

Sensory memory captures real-time events from agents and stores them in Flink's 
keyed state using `MapState`. This represents the immediate sensory input that 
agents receive from their environment. **Sensory memory is completely invisible 
to users** and operates automatically in the background.

**Characteristics:**
- **Storage**: Flink MapState (in-memory + checkpointed)
- **Retention**: Configurable window size
- **Access Pattern**: Lookup
- **Performance**: Nano-seconds latency
- **Visibility**: Internal only - no user API access


#### Short-term Memory (Recent History)

Short-term memory maintains a configurable history of recent agent interactions 
and experiences. It uses history storage for persistence and provides fast 
access to recent memories.

**Characteristics:**
- **Storage**: History storage (local to each TaskManager)
- **Retention**: Persistent, configurable TTL
- **Access Pattern**: Sequential, indexed by time
- **Performance**: Sub-millisecond latency

#### Long-term Memory (Semantic Search)

Long-term memory provides semantic search capabilities using a vector storage 
backend. It stores memories with embeddings for similarity-based retrieval.

**Characteristics:**
- **Storage**: Vector storage backend
- **Retention**: Persistent, configurable TTL
- **Access Pattern**: Semantic search, similarity-based
- **Performance**: Optimized for search operations

### Memory API Design

The Memory API provides a unified interface for different types of memories. 
**Sensory memory is not exposed through the API** and operates automatically in 
the background.

Memory inherits from Flink's State API. It behaves like other Flink KeyedState 
implementations and can be saved to StateBackends and checkpointed for fault 
tolerance and recovery.


```python
@dataclass
class MemoryItem(BaseModel):
    id: str = Field(..., description="The unique identifier for the text data")
    memory: str = Field(
        ..., description="The memory deduced from the text data"
    )
    metadata: Optional[Dict[str, Any]] = Field(None, description="Additional 
metadata for the text data")
    created_at: Optional[str] = Field(None, description="The timestamp when the 
memory was created")
    updated_at: Optional[str] = Field(None, description="The timestamp when the 
memory was updated")

class MemoryAPI(ABC):
    """
    Core memory interface for Flink agents.
    
    This API provides access to different types of memories:
    - Short-term memory: Recent history via get_history(n) method
    - Long-term memory: Semantic search via search() method
    
    Note: Sensory memory is internal and not exposed through this API.
    """
    
    @abstractmethod
    def add(self, memory: MemoryItem) -> str:
        """Store a memory item"""
        pass
    
    @abstractmethod
    def update(self, memory_id: str, data: MemoryItem) -> MemoryItem:
        """Update a memory by ID"""
        pass
    
    @abstractmethod
    def delete(self, memory_id: str) -> None:
        """Delete a memory by ID"""
        pass        
    
    @abstractmethod
    def get(self, memory_id: str) -> Optional[MemoryItem]:
        """Retrieve a memory by ID"""
        pass
    
    @abstractmethod
    def get_history(self, n: int = 10) -> List[MemoryItem]:
        """Get last n memories"""
        pass
    
    @abstractmethod
    def search(self, query: str, limit: int = 10) -> List[MemoryItem]:
        """Search memories by semantic similarity"""
        pass
```

## Knowledge

Knowledge is a special type of long-term memory that is shared across all Agent 
instances. It provides access to external, pre-built knowledge sources through 
remote vector database connections, offering domain-specific knowledge that can 
enhance agent responses. Unlike individual agent memories, knowledge is 
globally accessible and persistent across the entire Flink cluster. Since 
knowledge is independent from a particular agent,
it does not require Flink state checkpointing.

### Knowledge Architecture

```mermaid
graph TB
    subgraph "Flink Cluster"
        subgraph "Agent Instances"
            A1[Agent 1]
            A2[Agent 2]
            AN[Agent N]
        end
    end
    
    subgraph "External Knowledge Store"
        KB1[Knowledge Base 1<br/>Domain A]
        KB2[Knowledge Base 2<br/>Domain B]
        KB3[Knowledge Base N<br/>Domain N]
    end
    
    A1 -.-> KB1
    A1 -.-> KB2
    A1 -.-> KB3
    A2 -.-> KB1
    A2 -.-> KB2
    A2 -.-> KB3
    AN -.-> KB1
    AN -.-> KB2
    AN -.-> KB3
    
    style KB1 fill:#fff3e0
    style KB2 fill:#fff3e0
    style KB3 fill:#fff3e0
```

### Knowledge Base Implementation

```python
class KnowledgeBase:
    """
    Shared knowledge base integration.
    
    Provides access to pre-built knowledge sources through remote
    vector database connections. This is a special type of long-term
    memory that is shared across all agent instances in the cluster.
    """
    
    def __init__(self, service_url: str, api_key: str = None):
        self.service_url = service_url
        self.api_key = api_key
        self.client = VectorDBClient(service_url, api_key)
    
    def search(self, query: str, knowledge_source: str = None, limit: int = 10) 
-> List[KnowledgeItem]:
        """
        Search external knowledge base.
        
        Args:
            query: Search query string
            knowledge_source: Specific knowledge source to search (optional)
            limit: Maximum number of results to return
            
        Returns:
            List of knowledge items with relevance scores
        """
        response = self.client.search(
            query=query,
            source=knowledge_source,
            limit=limit
        )
        
        return [KnowledgeItem.from_response(item) for item in response.results]
    
    def get_sources(self) -> List[str]:
        """Get available knowledge sources"""
        return self.client.list_sources()
    
    def get_source_info(self, source_name: str) -> Dict[str, Any]:
        """Get information about a specific knowledge source"""
        return self.client.get_source_info(source_name)

class KnowledgeItem:
    """Represents a knowledge item from external sources"""
    
    def __init__(self, content: str, source: str, score: float, metadata: 
Dict[str, Any] = None):
        self.content = content
        self.source = source
        self.score = score
        self.metadata = metadata or {}
```

## Memory Compaction

Memory compaction addresses the fundamental limitation of Large Language Models 
(LLMs): their fixed context window constraints. While the Memory API can store 
vast amounts of memories limited only by disk capacity and storage limits, LLMs 
have context window limitations ranging from several thousand to several 
million tokens.

### Compaction Strategies

Memory compaction reduces memory volume while preserving essential information 
through two main approaches:

#### 1. Scoring-Based Compaction

Scoring-based compaction evaluates the relevance and importance of each memory 
item using mechanisms such as recency, frequency of access, semantic similarity 
to the current query, and user-defined importance markers. By ranking memories 
based on these criteria, the system retains only the most significant ones.

#### 2. Summarization-Based Compaction

Summarization-based compaction uses LLMs to create intelligent summaries of 
memory groups. This approach groups similar memories together and generates 
concise summaries that capture essential information from multiple related 
memories.

### Integration

The compaction functionality is integrated into the Agent API to provide 
seamless compaction capabilities. This integration ensures that agents can 
efficiently access relevant memory information without exceeding context window 
limitations, resulting in optimized context window usage, faster LLM inference, 
and reduced token costs.

## Data Flow

### Memory Addition Flow

```mermaid
sequenceDiagram
    participant Agent
    participant Memory
    participant LLM
    participant Embeddings
    participant VectorStore
    participant HistoryStore
    participant MemoryCompactor
    
    Agent->>Memory: add(messages, agent_id)
    Memory->>LLM: extract_facts(messages)
    LLM-->>Memory: extracted_facts[]
    
    loop For each fact
        Memory->>Embeddings: embed(fact)
        Embeddings-->>Memory: vector
        Memory->>VectorStore: search_similar(vector)
        VectorStore-->>Memory: existing_memories[]
    end
    
    Memory->>LLM: update_memory_decisions(facts, existing)
    LLM-->>Memory: actions[ADD/UPDATE/DELETE]
    
    loop For each action
        alt ADD
            Memory->>VectorStore: insert(vector, metadata)
            Memory->>HistoryStore: add_history(memory_id, null, new_memory, 
"ADD")
        else UPDATE
            Memory->>VectorStore: update(memory_id, vector, metadata)
            Memory->>HistoryStore: add_history(memory_id, old_memory, 
new_memory, "UPDATE")
        else DELETE
            Memory->>VectorStore: delete(memory_id)
            Memory->>HistoryStore: add_history(memory_id, old_memory, null, 
"DELETE")
        end        
    end
    
    Memory-->>Agent: memory_results[]
    
    alt triggers_compaction
        Agent->>MemoryCompactor: compact()
        MemoryCompactor-->>Agent: compaction_complete
    end
```

### Memory Search Flow

```mermaid
sequenceDiagram
    participant Agent
    participant Memory
    participant Embeddings
    participant VectorStore
    
    Agent->>Memory: search(query)
    Memory->>Embeddings: embed(query)
    Embeddings-->>Memory: query_vector
    
    Memory->>VectorStore: search(query_vector, filters)
    VectorStore-->>Memory: similar_memories[]    
    
    Memory-->>Agent: search_results[]
```

## Integration in Agents

```python
class AgentWithMemory(Agent):
    """Flink agent with shared memory and knowledge integration"""
    
    def __init__(self, agent_id: str, knowledge_bases: List[KnowledgeBase] = 
None):
        super().__init__(agent_id)
        self.knowledge_bases = knowledge_bases or []
    
    def search_knowledge(self, query: str, source: str = None, limit: int = 10) 
-> List[KnowledgeItem]:
        """
        Search across all available shared knowledge bases.
        
        Args:
            query: Search query
            source: Specific knowledge source (optional)
            limit: Maximum results per source
            
        Returns:
            Combined results from all shared knowledge bases
        """
        all_results = []
        
        for kb in self.knowledge_bases:
            if source is None or source == kb.get_source_info().get('name'):
                results = kb.search(query, limit=limit)
                all_results.extend(results)
        
        # Sort by relevance score
        all_results.sort(key=lambda x: x.score, reverse=True)
        return all_results[:limit]
    
    def process_event(self, event: AgentEvent):
        """Process event with memory and knowledge integration"""
        # Get memory context
        recent_history = self.memory.get_history(5)
        relevant_memories = self.memory.search(event.content, limit=10)
        compacted_context = self.memory_compactor.get()
        
        # Get knowledge context
        knowledge_context = self.search_knowledge(event.content, limit=5)
        
        # Generate response with compacted context
        response = self.generate_response(
            event,
            recent_history,
            relevant_memories,
            compacted_context, 
            knowledge_context
        )
        
        # Store interaction
        interaction_memory = MemoryItem(
            id=f"interaction_{time.time()}",
            memory=f"Event: {event.content}, Response: {response}",
            metadata={
                "timestamp": time.time(),
                "agent_id": self.agent_id,
            },
            created_at=str(time.time())
        )
        self.memory.store(interaction_memory)
        if(triggers_compaction):
            this.memory_compactor.compact()

        return response
```

## Execution Plan

The implementation will be executed in three phases:

### Phase 1: Core Memory Foundation
- **Short-term Memory**: History-based storage using MapState/ListState
- **Memory Compaction**: Scoring and summarization strategies
- **Knowledge Integration**: External knowledge base connections
- **Architecture**: Memory delegates to MapState/ListState (no Flink State 
inheritance)
- **Long-term Memory**: Replaced by compacted memory results

### Phase 2: Flink State Integration
- **Memory State Inheritance**: Memory inherits from Flink's State API
- **State Backend Integration**: Full integration with Flink state backends
- **Checkpointing Support**: Fault tolerance through Flink checkpointing

### Phase 3: Hybrid State Backend
- **Hybrid State Backend**: History Store and Vector Store.

## Conclusion

The Flink-Agents Memory System introduces a **revolutionary hybrid state 
backend** that combines RocksDB/ForSt and Lucene instances within a unified 
state management framework. This dual-engine approach eliminates the need for 
external vector databases while providing optimal performance for both 
historical data storage and semantic search operations.

### Key Innovations

1. **Hybrid State Backend**: The core innovation combining RocksDB/ForSt 
instances for historical data and Lucene instances for vector search
2. **Unified Interface**: Single API that automatically routes operations to 
the appropriate storage engine
3. **Distributed Architecture**: Each TaskManager hosts its own hybrid backend 
with isolated RocksDB/ForSt and Lucene instances
4. **Coordinated Checkpointing**: Ensures consistent state recovery across both 
storage engines


This design provides a solid foundation for building sophisticated, 
memory-aware agents in Apache Flink environments, with the hybrid state backend 
serving as the cornerstone of the architecture.


GitHub link: https://github.com/apache/flink-agents/discussions/92

----
This is an automatically sent email for issues@flink.apache.org.
To unsubscribe, please send an email to: issues-unsubscr...@flink.apache.org

Reply via email to