GitHub user alnzng created a discussion: Vector Store Integration for Flink 
Agents

## Overview

Vector stores enable agents to perform semantic search and knowledge retrieval 
over large document collections. This unlocks powerful new agent patterns 
including **RAG (Retrieval-Augmented Generation)** where agents search relevant 
documents before generating responses, **knowledge base agents** that answer 
questions from company documentation, and **context-aware conversations** that 
maintain relevant context across long interactions. Vector stores will also 
serve as a foundation for future long-term memory capabilities.

We propose adding **Vector Store** as a new resource type following the same 
architectural pattern as ChatModel, with multiple built-in implementations to 
support different use cases and deployment preferences.

## Design Overview

```
User Input → RAG Agent → Vector Search → Context Retrieval → Enhanced 
Generation → Response

Detailed Event Flow:
┌─────────────┐    ┌──────────────────┐    ┌─────────────────────┐    
┌─────────────────┐    ┌──────────────────┐    ┌─────────────┐
│ InputEvent  │───▶│ VectorSearchEvent│───▶│ VectorSearchResult  │───▶│ 
ChatRequestEvent│───▶│ ChatResponseEvent│───▶│ OutputEvent │
│             │    │                  │    │       Event         │    │         
        │    │                  │    │             │
└─────────────┘    └──────────────────┘    └─────────────────────┘    
└─────────────────┘    └──────────────────┘    └─────────────┘
                            │                           ▲                       
 │                         ▲
                            ▼                           │                       
 ▼                         │
                   ┌─────────────────┐                  │               
┌─────────────────┐                │
                   │ Vector Store    │──────────────────┘               │ Chat 
Model      │────────────────┘
                   │ Backend         │                                  │ 
(OpenAI/etc)    │
                   └─────────────────┘                                  
└─────────────────┘
                            │
                            ▼
                   ┌─────────────────┐
                   │ Embedding Model │
                   │ (text→vectors)  │
                   └─────────────────┘
```


## APIs

### Base Classes

```python
class BaseVectorStoreConnection(Resource, ABC):
    """Base class for vector store connections.
    
    Manages connection configuration such as endpoints, authentication, and 
connection parameters.
    """

    @abstractmethod
    def search(self, query: str, k: int, **kwargs) -> List[VectorSearchResult]:
        """Perform semantic search."""
        pass

class BaseVectorStoreSetup(Resource, ABC):
    """Base class for vector store setup.
    
    Similar to BaseChatModelSetup, configures the vector store with
    specific collections, embedding models, and search parameters.
    
    Attributes:
    ----------
    connection_name : str
        Name of the connection resource to use
    embedding_model : str
        Name/identifier of the embedding model to use
    """

    connection_name: str
    embedding_model: str
```

### Event Types

New event types for vector operations:

```python
class VectorSearchEvent(Event):
    """Event to request semantic search from vector store.
    
    Attributes:
    ----------
    vector_store : str
        Name of the vector store resource to use
    query : str  
        Search query text
    k : int
        Number of results to return (default: 5)
    """

    vector_store: str
    query: str
    k: int = 5

class VectorSearchResultEvent(Event):
    """Event containing vector search results.
    
    Attributes:
    ----------
    request : VectorSearchEvent
        The original search request
    results : List[VectorSearchResult] 
        Search results with documents and scores
    """

    request: VectorSearchEvent
    results: List[VectorSearchResult]
```

### RAG Agent Example

```python
class RAGAgent(Agent):
    @vector_store_connection
    @staticmethod
    def elasticsearch_connection() -> Tuple[Type[BaseVectorStoreConnection], 
Dict[str, Any]]:
        """Elasticsearch connection for vector store."""
        return ElasticsearchConnection, {
            "name": "es_conn",
            "hosts": ["http://localhost:9200";]
        }

    @vector_store
    @staticmethod
    def knowledge_base() -> Tuple[Type[BaseVectorStoreSetup], Dict[str, Any]]:
        """Knowledge base vector store using Elasticsearch."""
        return ElasticsearchVectorStore, {
            "name": "knowledge_base",
            "connection": "es_conn",
            "embedding_model": "<your_embedding_model_here>"
        }

    @chat_model_server
    @staticmethod
    def openai_connection() -> Tuple[Type[BaseChatModelConnection], Dict[str, 
Any]]:
        """OpenAI connection for chat model."""
        return OpenAIChatModelConnection, {
            "name": "openai_conn",
            "api_key": "<your_openai_api_key_here>"
        }

    @chat_model
    @staticmethod
    def rag_chat_model() -> Tuple[Type[BaseChatModelSetup], Dict[str, Any]]:
        """Chat model for RAG responses."""
        return OpenAIChatModelSetup, {
            "name": "rag_model",
            "connection": "openai_conn",
            "model": "<your_chat_model_here>"
        }

    @action(InputEvent)
    @staticmethod
    def search_knowledge(event: InputEvent, ctx: RunnerContext):
        # Search for relevant context first
        ctx.send_event(VectorSearchEvent(
            vector_store="knowledge_base",
            query=event.input,
            k=3
        ))

    @action(VectorSearchResultEvent)
    @staticmethod
    def generate_response(event: VectorSearchResultEvent, ctx: RunnerContext):
        # Use search results as context for chat model
        context = "\\n".join([r.document.content for r in event.results])
        enhanced_prompt = f"Context: {context}\\nQuestion: 
{event.request.query}"

        ctx.send_event(ChatRequestEvent(
            model="rag_model",
            messages=[ChatMessage(content=enhanced_prompt)]
        ))
```

GitHub link: https://github.com/apache/flink-agents/discussions/143

----
This is an automatically sent email for issues@flink.apache.org.
To unsubscribe, please send an email to: issues-unsubscr...@flink.apache.org

Reply via email to