GitHub user alnzng edited a discussion: Vector Store Integration for Flink 
Agents

## Overview

Vector stores enable agents to perform semantic search and knowledge retrieval 
over large document collections. This unlocks powerful new agent patterns 
including **RAG (Retrieval-Augmented Generation)** where agents search relevant 
documents before generating responses, **knowledge base agents** that answer 
questions from company documentation, and **context-aware conversations** that 
maintain relevant context across long interactions. Vector stores will also 
serve as a foundation for future long-term memory capabilities.

We propose adding **Vector Store** as a new resource type following the same 
architectural pattern as ChatModel, with multiple built-in implementations to 
support different use cases and deployment preferences.

## Design Overview

```
User Input → RAG Agent → Query Embedding → Vector Search → Context Retrieval → 
Enhanced Generation → Response

Detailed Event Flow:
┌─────────────┐    ┌──────────────────┐    ┌─────────────────────┐    
┌─────────────────┐    ┌──────────────────┐    ┌─────────────┐
│ InputEvent  │───▶│ VectorSearchEvent│───▶│ VectorSearchResult  │───▶│ 
ChatRequestEvent│───▶│ ChatResponseEvent│───▶│ OutputEvent │
│             │    │                  │    │       Event         │    │         
        │    │                  │    │             │
└─────────────┘    └──────────────────┘    └─────────────────────┘    
└─────────────────┘    └──────────────────┘    └─────────────┘
                            │                           ▲                       
 │                         ▲
                            ▼                           │                       
 ▼                         │
                   ┌─────────────────┐                  │               
┌─────────────────┐                │
                   │ Vector Store    │──────────────────┘               │ Chat 
Model      │────────────────┘
                   │ Backend         │                                  │ 
(OpenAI/etc)    │
                   └─────────────────┘                                  
└─────────────────┘
                            │                    
                            ▼                    
                   ┌─────────────────┐           
                   │ Embedding Model │           
                   │ (text→vectors)  │           
                   └─────────────────┘           
```


## APIs

### Base Classes

```python
class BaseEmbeddingModelConnection(Resource, ABC):
    """Base class for embedding model connections.
    
    Similar to BaseChatModelConnection, manages connection configuration
    for embedding service providers (OpenAI, Azure, local models, etc.).
    """
    
    @abstractmethod
    def embed_query(self, text: str) -> List[float]:
        """Generate embedding for a single query text."""
        pass

class BaseEmbeddingModelSetup(Resource, ABC):
    """Base class for embedding model setup.
    
    Similar to BaseChatModelSetup, configures the embedding model with
    specific model names, dimensions, and processing parameters.
    
    Attributes:
    ----------
    connection_name : str
        Name of the connection resource to use
    model_name : str
        Name of the specific embedding model (e.g., "text-embedding-3-small")
    """
    
    connection_name: str
    model_name: str

class BaseVectorStoreConnection(Resource, ABC):
    """Base class for vector store connections.
    
    Manages connection configuration such as endpoints, authentication, and 
connection parameters.
  
    """

    @abstractmethod
    def search_with_embedding(self, embedding: List[float], k: int, **kwargs) 
-> List[VectorSearchResult]:
        """Perform search using pre-computed embedding vector."""
        pass

class BaseVectorStoreSetup(Resource, ABC):
    """Base class for vector store setup.
    
    Similar to BaseChatModelSetup, configures the vector store with
    specific collections, embedding models, and search parameters.
    
    Attributes:
    ----------
    connection_name : str
        Name of the connection resource to use
    embedding_model : str
        Name of the embedding model resource to use
    """

    connection_name: str
    embedding_model: str
```

### Event Types

New event types for vector operations:

```python
class VectorSearchEvent(Event):
    """Event to request semantic search from vector store.
    
    Attributes:
    ----------
    vector_store : str
        Name of the vector store resource to use
    query : str  
        Search query text
    k : int
        Number of results to return (default: 5)
    """

    vector_store: str
    query: str
    k: int = 5

class VectorSearchResultEvent(Event):
    """Event containing vector search results.
    
    Attributes:
    ----------
    request : VectorSearchEvent
        The original search request
    results : List[VectorSearchResult] 
        Search results with documents and scores
    """

    request: VectorSearchEvent
    results: List[VectorSearchResult]
```

### RAG Agent Example

```python
class RAGAgent(Agent):
    @embedding_model_connection
    @staticmethod
    def openai_embedding_connection() -> 
Tuple[Type[BaseEmbeddingModelConnection], Dict[str, Any]]:
        """OpenAI connection for embedding models."""
        return OpenAIEmbeddingConnection, {
            "name": "openai_embed_conn",
            "api_key": os.getenv("OPENAI_API_KEY")
        }
    
    @embedding_model
    @staticmethod
    def text_embeddings() -> Tuple[Type[BaseEmbeddingModelSetup], Dict[str, 
Any]]:
        """Text embedding model for semantic search."""
        return OpenAIEmbeddingModel, {
            "name": "text_embeddings",
            "connection": "openai_embed_conn",
            "model_name": "text-embedding-3-small"
        }

    @vector_store_connection
    @staticmethod
    def elasticsearch_connection() -> Tuple[Type[BaseVectorStoreConnection], 
Dict[str, Any]]:
        """Elasticsearch connection for vector store."""
        return ElasticsearchConnection, {
            "name": "es_conn",
            "hosts": ["http://localhost:9200";]
        }

    @vector_store
    @staticmethod
    def knowledge_base() -> Tuple[Type[BaseVectorStoreSetup], Dict[str, Any]]:
        """Knowledge base vector store using Elasticsearch."""
        return ElasticsearchVectorStore, {
            "name": "knowledge_base",
            "connection": "es_conn",
            "index": "documents",
            "embedding_model": "text_embeddings"  # Reference to embedding 
model resource
        }

    @chat_model_server
    @staticmethod
    def openai_chat_connection() -> Tuple[Type[BaseChatModelConnection], 
Dict[str, Any]]:
        """OpenAI connection for chat model."""
        return OpenAIChatModelConnection, {
            "name": "openai_chat_conn",
            "api_key": os.getenv("OPENAI_API_KEY")
        }

    @chat_model
    @staticmethod
    def rag_chat_model() -> Tuple[Type[BaseChatModelSetup], Dict[str, Any]]:
        """Chat model for RAG responses."""
        return OpenAIChatModelSetup, {
            "name": "rag_model",
            "connection": "openai_chat_conn",
            "model": "gpt-4o-mini"
        }

    @action(InputEvent)
    @staticmethod
    def search_knowledge(event: InputEvent, ctx: RunnerContext):
        # Search for relevant context first
        ctx.send_event(VectorSearchEvent(
            vector_store="knowledge_base",
            query=event.input,
            k=3
        ))

    @action(VectorSearchResultEvent)
    @staticmethod
    def generate_response(event: VectorSearchResultEvent, ctx: RunnerContext):
        # Use search results as context for chat model
        context = "\\n".join([r.document.content for r in event.results])
        enhanced_prompt = f"Context: {context}\\nQuestion: 
{event.request.query}"

        ctx.send_event(ChatRequestEvent(
            model="rag_model",
            messages=[ChatMessage(content=enhanced_prompt)]
        ))
```

GitHub link: https://github.com/apache/flink-agents/discussions/143

----
This is an automatically sent email for issues@flink.apache.org.
To unsubscribe, please send an email to: issues-unsubscr...@flink.apache.org

Reply via email to