twosom commented on code in PR #341: URL: https://github.com/apache/flink-agents/pull/341#discussion_r2600922812
########## examples/src/main/java/org/apache/flink/agents/examples/rag/ElasticsearchRagExample.java: ########## @@ -0,0 +1,431 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.examples.rag; + +import org.apache.flink.agents.api.Agent; +import org.apache.flink.agents.api.AgentsExecutionEnvironment; +import org.apache.flink.agents.api.InputEvent; +import org.apache.flink.agents.api.OutputEvent; +import org.apache.flink.agents.api.annotation.*; +import org.apache.flink.agents.api.chat.messages.ChatMessage; +import org.apache.flink.agents.api.chat.messages.MessageRole; +import org.apache.flink.agents.api.context.RunnerContext; +import org.apache.flink.agents.api.event.ChatRequestEvent; +import org.apache.flink.agents.api.event.ChatResponseEvent; +import org.apache.flink.agents.api.event.ContextRetrievalRequestEvent; +import org.apache.flink.agents.api.event.ContextRetrievalResponseEvent; +import org.apache.flink.agents.api.resource.ResourceDescriptor; +import org.apache.flink.agents.api.resource.ResourceType; +import org.apache.flink.agents.api.vectorstores.Document; +import org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelConnection; +import org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelSetup; +import org.apache.flink.agents.integrations.embeddingmodels.ollama.OllamaEmbeddingModelConnection; +import org.apache.flink.agents.integrations.embeddingmodels.ollama.OllamaEmbeddingModelSetup; +import org.apache.flink.agents.integrations.vectorstores.elasticsearch.ElasticsearchVectorStore; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Retrieval-Augmented Generation (RAG) example using Ollama for embeddings and chat along with + * Elasticsearch as the vector store. + * + * <p>This example demonstrates an agent that: + * + * <ul> + * <li>Embeds the incoming user query using an Ollama embedding model, + * <li>Retrieves relevant context from Elasticsearch via a {@code VectorStore}, + * <li>Formats a prompt that includes the retrieved context, and + * <li>Generates a response using an Ollama chat model. + * </ul> + * + * <p>Prerequisites: + * + * <ul> + * <li>Elasticsearch 8.x reachable from this example with an index that contains a {@code + * dense_vector} field for KNN search. + * <li>Ollama running locally (default {@code http://localhost:11434}) with the configured + * embedding and chat models available. + * </ul> + * + * <p>Example Elasticsearch mapping (adjust index name, field, dims, and similarity to your needs): + * + * <pre>{@code + * { + * "mappings": { + * "properties": { + * "content": { "type": "text" }, + * "metadata": { "type": "object", "enabled": false }, + * "content_vector": { "type": "dense_vector", "dims": 768, "similarity": "cosine" } + * } + * } + * } + * }</pre> + * + * <p>System properties you can override: + * + * <ul> + * <li>{@code ES_HOST} (default {@code http://localhost:9200}) + * <li>{@code ES_INDEX} (default {@code my_documents}) + * <li>{@code ES_VECTOR_FIELD} (default {@code content_vector}) + * <li>{@code ES_DIMS} (default {@code 768}) + * <li>{@code ES_SIMILARITY} (default {@code cosine}) — used by the optional population step + * <li>Authentication (optional, used by both vector store and population step): + * <ul> + * <li>{@code ES_API_KEY_BASE64} — Base64 of {@code apiKeyId:apiKeySecret} + * <li>{@code ES_API_KEY_ID} and {@code ES_API_KEY_SECRET} — combined and Base64-encoded + * <li>{@code ES_USERNAME} and {@code ES_PASSWORD} — basic authentication + * </ul> + * <li>{@code OLLAMA_ENDPOINT} (default {@code http://localhost:11434}) + * <li>{@code OLLAMA_EMBEDDING_MODEL} (default {@code nomic-embed-text}) + * <li>{@code OLLAMA_CHAT_MODEL} (default {@code qwen3:8b}) + * <li>{@code ES_POPULATE} (default {@code true}) — whether to populate sample data on startup + * </ul> + * + * <p>Direct CLI flags (optional): <br> + * Instead of or in addition to system properties, you can pass flags when starting the job. CLI + * flags take precedence over existing system properties. + * + * <ul> + * <li>{@code --es.host=http://your-es:9200} + * <li>{@code --es.username=elastic} and {@code --es.password=secret} + * <li>{@code --es.apiKeyBase64=BASE64_ID_COLON_SECRET} or {@code --es.apiKeyId=ID} with {@code + * --es.apiKeySecret=SECRET} + * <li>{@code --es.index=my_documents}, {@code --es.vectorField=content_vector}, {@code + * --es.dims=768}, {@code --es.similarity=cosine} + * <li>{@code --ollama.endpoint=http://localhost:11434}, {@code + * --ollama.embeddingModel=nomic-embed-text}, {@code --ollama.chatModel=qwen3:8b} + * <li>{@code --es.populate=true|false} + * </ul> + * + * <p>Examples: + * + * <pre>{@code + * # Use API key (base64 of id:secret) and custom host + * flink run ... -c org.apache.flink.agents.examples.rag.ElasticsearchRagExample \ + * examples.jar --es.host=http://es:9200 --es.apiKeyBase64=XXXXX= + * + * # Use basic authentication + * flink run ... -c org.apache.flink.agents.examples.rag.ElasticsearchRagExample \ + * examples.jar --es.host=http://es:9200 --es.username=elastic --es.password=secret + * }</pre> + * + * <p>Notes: + * + * <ul> + * <li>Authentication can be provided via either CLI flags or System properties; API key takes + * precedence over basic auth when both are present. + * <li>The optional knowledge base population step uses the same System properties to connect to + * Elasticsearch. + * </ul> + * + * <p>Running the example will: + * + * <ol> + * <li>Optionally populate the Elasticsearch index with sample documents and stored vectors, + * <li>Create a simple agent pipeline that retrieves context from Elasticsearch, and + * <li>Print the model's answers for a set of example queries. + * </ol> + */ +public class ElasticsearchRagExample { + + public static class MyRagAgent extends Agent { + + @Prompt + public static org.apache.flink.agents.api.prompt.Prompt contextEnhancedPrompt() { + String template = + "Based on the following context, please answer the user's question.\n\n" + + "Context:\n{context}\n\n" + + "User Question:\n{user_query}\n\n" + + "Please provide a helpful answer based on the context provided."; + return new org.apache.flink.agents.api.prompt.Prompt(template); + } + + @EmbeddingModelConnection + public static ResourceDescriptor textEmbedderConnection() { + return ResourceDescriptor.Builder.newBuilder( + OllamaEmbeddingModelConnection.class.getName()) + .addInitialArgument( + "host", System.getProperty("OLLAMA_ENDPOINT", "http://localhost:11434")) + .addInitialArgument( + "model", + System.getProperty("OLLAMA_EMBEDDING_MODEL", "nomic-embed-text")) + .build(); + } + + @EmbeddingModelSetup + public static ResourceDescriptor textEmbedder() { + // Embedding setup referencing the embedding connection name + return ResourceDescriptor.Builder.newBuilder(OllamaEmbeddingModelSetup.class.getName()) + .addInitialArgument("connection", "textEmbedderConnection") + .build(); + } + + @ChatModelConnection + public static ResourceDescriptor ollamaChatModelConnection() { + return ResourceDescriptor.Builder.newBuilder(OllamaChatModelConnection.class.getName()) + .addInitialArgument( + "endpoint", + System.getProperty("OLLAMA_ENDPOINT", "http://localhost:11434")) + .addInitialArgument("requestTimeout", 120) + .build(); + } + + @ChatModelSetup + public static ResourceDescriptor chatModel() { + return ResourceDescriptor.Builder.newBuilder(OllamaChatModelSetup.class.getName()) + .addInitialArgument("connection", "ollamaChatModelConnection") + .addInitialArgument( + "model", System.getProperty("OLLAMA_CHAT_MODEL", "qwen3:8b")) + .build(); + } + + @VectorStore + public static ResourceDescriptor knowledgeBase() { + ResourceDescriptor.Builder builder = + ResourceDescriptor.Builder.newBuilder(ElasticsearchVectorStore.class.getName()) + .addInitialArgument("embedding_model", "textEmbedder") + .addInitialArgument( + "index", System.getProperty("ES_INDEX", "my_documents")) + .addInitialArgument( + "vector_field", + System.getProperty("ES_VECTOR_FIELD", "content_vector")) + .addInitialArgument("dims", Integer.getInteger("ES_DIMS", 768)) + .addInitialArgument( + "host", System.getProperty("ES_HOST", "http://localhost:9200")); + + // Optional authentication + String apiKeyBase64 = System.getProperty("ES_API_KEY_BASE64"); + String apiKeyId = System.getProperty("ES_API_KEY_ID"); + String apiKeySecret = System.getProperty("ES_API_KEY_SECRET"); + String username = System.getProperty("ES_USERNAME"); + String password = System.getProperty("ES_PASSWORD"); + + if (apiKeyBase64 != null && !apiKeyBase64.isEmpty()) { + builder.addInitialArgument("api_key_base64", apiKeyBase64); + } else if (apiKeyId != null + && apiKeySecret != null + && !apiKeyId.isEmpty() + && !apiKeySecret.isEmpty()) { + builder.addInitialArgument("api_key_id", apiKeyId) + .addInitialArgument("api_key_secret", apiKeySecret); + } else if (username != null + && password != null + && !username.isEmpty() + && !password.isEmpty()) { + builder.addInitialArgument("username", username) + .addInitialArgument("password", password); + } + + return builder.build(); + } + + /** + * Converts an incoming {@link InputEvent} into a {@link ContextRetrievalRequestEvent} that + * asks the vector store to fetch relevant documents for the input string. The vector store + * resource is referenced by name ({@code "knowledgeBase"}). + */ + @Action(listenEvents = {InputEvent.class}) + public static void processInput(InputEvent event, RunnerContext ctx) { + ctx.sendEvent( + new ContextRetrievalRequestEvent((String) event.getInput(), "knowledgeBase")); + } + + /** + * Receives retrieved documents from the vector store, constructs a context string, formats + * the prompt using the {@code contextEnhancedPrompt}, and emits a {@link ChatRequestEvent} + * targeting the configured chat model. + * + * @param event contains the user query and the list of retrieved documents + * @param context provides access to resources (e.g., the prompt) and lets the agent send + * the next event + */ + @Action(listenEvents = {ContextRetrievalResponseEvent.class}) + public static void processRetrievedContext( + ContextRetrievalResponseEvent<Map<String, Object>> event, RunnerContext context) + throws Exception { + final String userQuery = event.getQuery(); + final List<Document<Map<String, Object>>> docs = event.getDocuments(); + + // Build context text from retrieved documents + List<String> items = new ArrayList<>(); + for (int i = 0; i < docs.size(); i++) { + Object content = docs.get(i).getContent(); + items.add(String.format("%d. %s", i + 1, content)); + } + String contextText = String.join("\n\n", items); Review Comment: @wenjin272 Thank you for your feedback. I believe this should be left entirely to the user's discretion. In the example, I converted it to String for simplicity. However, downstream users might want to extract only specific properties from the Map<String, Object> ContentT to use as context for the LLM. For example: ```java Map<String, Object> content = (Map) docs.get(i).getContent(); String someProperty = (String) content.get("<some-property>"); // and now we can use 'someProperty' for llm context ``` Of course, this means users need to be aware of the ContentT type returned by the specific vector database implementation, which makes the code more complex. However, it's important to note that retrieved documents are not always String type. By default, Elasticsearch maintains documents in JSON format. I think the current generic approach provides flexibility while keeping the example simple. Users who need String-only content can easily convert it, while users who need structured access to document fields can work with the Map directly. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
