GitHub user alnzng edited a discussion: Vector Store Integration for Flink Agents
## Overview Vector stores enable agents to perform semantic search and knowledge retrieval over large document collections. This unlocks powerful new agent patterns including **RAG (Retrieval-Augmented Generation)** where agents search relevant documents before generating responses, **knowledge base agents** that answer questions from company documentation, and **context-aware conversations** that maintain relevant context across long interactions. Vector stores will also serve as a foundation for future long-term memory capabilities. We propose adding **Vector Store** as a new resource type following the same architectural pattern as ChatModel, with multiple built-in implementations to support different use cases and deployment preferences. ## Design Overview ``` User Input → RAG Agent → Query Embedding → Vector Search → Context Retrieval → Enhanced Generation → Response Detailed Event Flow: ┌─────────────┐ ┌──────────────────┐ ┌─────────────────────┐ ┌─────────────────┐ ┌──────────────────┐ ┌─────────────┐ │ InputEvent │───▶│ VectorSearchEvent│───▶│ VectorSearchResult │───▶│ ChatRequestEvent│───▶│ ChatResponseEvent│───▶│ OutputEvent │ │ │ │ │ │ Event │ │ │ │ │ │ │ └─────────────┘ └──────────────────┘ └─────────────────────┘ └─────────────────┘ └──────────────────┘ └─────────────┘ │ ▲ │ ▲ ▼ │ ▼ │ ┌─────────────────┐ │ ┌─────────────────┐ │ │ Vector Store │──────────────────┘ │ Chat Model │────────────────┘ │ Backend │ │ (OpenAI/etc) │ └─────────────────┘ └─────────────────┘ │ ▼ ┌─────────────────┐ │ Embedding Model │ │ (text→vectors) │ └─────────────────┘ ``` ## APIs ### Base Classes ```python class BaseEmbeddingModelConnection(Resource, ABC): """Base class for embedding model connections. Similar to BaseChatModelConnection, manages connection configuration for embedding service providers (OpenAI, Azure, local models, etc.). """ @abstractmethod def embed_query(self, text: str) -> List[float]: """Generate embedding for a single query text.""" pass class BaseEmbeddingModelSetup(Resource, ABC): """Base class for embedding model setup. Similar to BaseChatModelSetup, configures the embedding model with specific model names, dimensions, and processing parameters. Attributes: ---------- connection_name : str Name of the connection resource to use model_name : str Name of the specific embedding model (e.g., "text-embedding-3-small") """ connection_name: str model_name: str class BaseVectorStoreConnection(Resource, ABC): """Base class for vector store connections. Manages connection configuration such as endpoints, authentication, and connection parameters. """ @abstractmethod def search_with_embedding(self, embedding: List[float], k: int, **kwargs) -> List[VectorSearchResult]: """Perform search using pre-computed embedding vector.""" pass class BaseVectorStoreSetup(Resource, ABC): """Base class for vector store setup. Similar to BaseChatModelSetup, configures the vector store with specific collections, embedding models, and search parameters. Attributes: ---------- connection_name : str Name of the connection resource to use embedding_model : str Name of the embedding model resource to use """ connection_name: str embedding_model: str ``` ### Event Types New event types for vector operations: ```python class VectorSearchEvent(Event): """Event to request semantic search from vector store. Attributes: ---------- vector_store : str Name of the vector store resource to use query : str Search query text k : int Number of results to return (default: 5) """ vector_store: str query: str k: int = 5 class VectorSearchResultEvent(Event): """Event containing vector search results. Attributes: ---------- request : VectorSearchEvent The original search request results : List[VectorSearchResult] Search results with documents and scores """ request: VectorSearchEvent results: List[VectorSearchResult] ``` ### RAG Agent Example ```python class RAGAgent(Agent): @embedding_model_connection @staticmethod def openai_embedding_connection() -> Tuple[Type[BaseEmbeddingModelConnection], Dict[str, Any]]: """OpenAI connection for embedding models.""" return OpenAIEmbeddingConnection, { "name": "openai_embed_conn", "api_key": os.getenv("OPENAI_API_KEY") } @embedding_model @staticmethod def text_embeddings() -> Tuple[Type[BaseEmbeddingModelSetup], Dict[str, Any]]: """Text embedding model for semantic search.""" return OpenAIEmbeddingModel, { "name": "text_embeddings", "connection": "openai_embed_conn", "model_name": "text-embedding-3-small" } @vector_store_connection @staticmethod def elasticsearch_connection() -> Tuple[Type[BaseVectorStoreConnection], Dict[str, Any]]: """Elasticsearch connection for vector store.""" return ElasticsearchConnection, { "name": "es_conn", "hosts": ["http://localhost:9200"] } @vector_store @staticmethod def knowledge_base() -> Tuple[Type[BaseVectorStoreSetup], Dict[str, Any]]: """Knowledge base vector store using Elasticsearch.""" return ElasticsearchVectorStore, { "name": "knowledge_base", "connection": "es_conn", "index": "documents", "embedding_model": "text_embeddings" # Reference to embedding model resource } @chat_model_server @staticmethod def openai_chat_connection() -> Tuple[Type[BaseChatModelConnection], Dict[str, Any]]: """OpenAI connection for chat model.""" return OpenAIChatModelConnection, { "name": "openai_chat_conn", "api_key": os.getenv("OPENAI_API_KEY") } @chat_model @staticmethod def rag_chat_model() -> Tuple[Type[BaseChatModelSetup], Dict[str, Any]]: """Chat model for RAG responses.""" return OpenAIChatModelSetup, { "name": "rag_model", "connection": "openai_chat_conn", "model": "gpt-4o-mini" } @action(InputEvent) @staticmethod def search_knowledge(event: InputEvent, ctx: RunnerContext): # Search for relevant context first ctx.send_event(VectorSearchEvent( vector_store="knowledge_base", query=event.input, k=3 )) @action(VectorSearchResultEvent) @staticmethod def generate_response(event: VectorSearchResultEvent, ctx: RunnerContext): # Use search results as context for chat model context = "\\n".join([r.document.content for r in event.results]) enhanced_prompt = f"Context: {context}\\nQuestion: {event.request.query}" ctx.send_event(ChatRequestEvent( model="rag_model", messages=[ChatMessage(content=enhanced_prompt)] )) ``` GitHub link: https://github.com/apache/flink-agents/discussions/143 ---- This is an automatically sent email for issues@flink.apache.org. To unsubscribe, please send an email to: issues-unsubscr...@flink.apache.org