GitHub user alnzng created a discussion: Vector Store Integration for Flink Agents
## Overview Vector stores enable agents to perform semantic search and knowledge retrieval over large document collections. This unlocks powerful new agent patterns including **RAG (Retrieval-Augmented Generation)** where agents search relevant documents before generating responses, **knowledge base agents** that answer questions from company documentation, and **context-aware conversations** that maintain relevant context across long interactions. Vector stores will also serve as a foundation for future long-term memory capabilities. We propose adding **Vector Store** as a new resource type following the same architectural pattern as ChatModel, with multiple built-in implementations to support different use cases and deployment preferences. ## Design Overview ``` User Input → RAG Agent → Vector Search → Context Retrieval → Enhanced Generation → Response Detailed Event Flow: ┌─────────────┐ ┌──────────────────┐ ┌─────────────────────┐ ┌─────────────────┐ ┌──────────────────┐ ┌─────────────┐ │ InputEvent │───▶│ VectorSearchEvent│───▶│ VectorSearchResult │───▶│ ChatRequestEvent│───▶│ ChatResponseEvent│───▶│ OutputEvent │ │ │ │ │ │ Event │ │ │ │ │ │ │ └─────────────┘ └──────────────────┘ └─────────────────────┘ └─────────────────┘ └──────────────────┘ └─────────────┘ │ ▲ │ ▲ ▼ │ ▼ │ ┌─────────────────┐ │ ┌─────────────────┐ │ │ Vector Store │──────────────────┘ │ Chat Model │────────────────┘ │ Backend │ │ (OpenAI/etc) │ └─────────────────┘ └─────────────────┘ │ ▼ ┌─────────────────┐ │ Embedding Model │ │ (text→vectors) │ └─────────────────┘ ``` ## APIs ### Base Classes ```python class BaseVectorStoreConnection(Resource, ABC): """Base class for vector store connections. Manages connection configuration such as endpoints, authentication, and connection parameters. """ @abstractmethod def search(self, query: str, k: int, **kwargs) -> List[VectorSearchResult]: """Perform semantic search.""" pass class BaseVectorStoreSetup(Resource, ABC): """Base class for vector store setup. Similar to BaseChatModelSetup, configures the vector store with specific collections, embedding models, and search parameters. Attributes: ---------- connection_name : str Name of the connection resource to use embedding_model : str Name/identifier of the embedding model to use """ connection_name: str embedding_model: str ``` ### Event Types New event types for vector operations: ```python class VectorSearchEvent(Event): """Event to request semantic search from vector store. Attributes: ---------- vector_store : str Name of the vector store resource to use query : str Search query text k : int Number of results to return (default: 5) """ vector_store: str query: str k: int = 5 class VectorSearchResultEvent(Event): """Event containing vector search results. Attributes: ---------- request : VectorSearchEvent The original search request results : List[VectorSearchResult] Search results with documents and scores """ request: VectorSearchEvent results: List[VectorSearchResult] ``` ### RAG Agent Example ```python class RAGAgent(Agent): @vector_store_connection @staticmethod def elasticsearch_connection() -> Tuple[Type[BaseVectorStoreConnection], Dict[str, Any]]: """Elasticsearch connection for vector store.""" return ElasticsearchConnection, { "name": "es_conn", "hosts": ["http://localhost:9200"] } @vector_store @staticmethod def knowledge_base() -> Tuple[Type[BaseVectorStoreSetup], Dict[str, Any]]: """Knowledge base vector store using Elasticsearch.""" return ElasticsearchVectorStore, { "name": "knowledge_base", "connection": "es_conn", "embedding_model": "<your_embedding_model_here>" } @chat_model_server @staticmethod def openai_connection() -> Tuple[Type[BaseChatModelConnection], Dict[str, Any]]: """OpenAI connection for chat model.""" return OpenAIChatModelConnection, { "name": "openai_conn", "api_key": "<your_openai_api_key_here>" } @chat_model @staticmethod def rag_chat_model() -> Tuple[Type[BaseChatModelSetup], Dict[str, Any]]: """Chat model for RAG responses.""" return OpenAIChatModelSetup, { "name": "rag_model", "connection": "openai_conn", "model": "<your_chat_model_here>" } @action(InputEvent) @staticmethod def search_knowledge(event: InputEvent, ctx: RunnerContext): # Search for relevant context first ctx.send_event(VectorSearchEvent( vector_store="knowledge_base", query=event.input, k=3 )) @action(VectorSearchResultEvent) @staticmethod def generate_response(event: VectorSearchResultEvent, ctx: RunnerContext): # Use search results as context for chat model context = "\\n".join([r.document.content for r in event.results]) enhanced_prompt = f"Context: {context}\\nQuestion: {event.request.query}" ctx.send_event(ChatRequestEvent( model="rag_model", messages=[ChatMessage(content=enhanced_prompt)] )) ``` GitHub link: https://github.com/apache/flink-agents/discussions/143 ---- This is an automatically sent email for issues@flink.apache.org. To unsubscribe, please send an email to: issues-unsubscr...@flink.apache.org