xintongsong commented on code in PR #341:
URL: https://github.com/apache/flink-agents/pull/341#discussion_r2609254129


##########
examples/src/main/java/org/apache/flink/agents/examples/rag/ElasticsearchRagExample.java:
##########
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.examples.rag;
+
+import org.apache.flink.agents.api.Agent;
+import org.apache.flink.agents.api.AgentsExecutionEnvironment;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.api.annotation.*;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.context.RunnerContext;
+import org.apache.flink.agents.api.event.ChatRequestEvent;
+import org.apache.flink.agents.api.event.ChatResponseEvent;
+import org.apache.flink.agents.api.event.ContextRetrievalRequestEvent;
+import org.apache.flink.agents.api.event.ContextRetrievalResponseEvent;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.vectorstores.Document;
+import 
org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelConnection;
+import 
org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelSetup;
+import 
org.apache.flink.agents.integrations.embeddingmodels.ollama.OllamaEmbeddingModelConnection;
+import 
org.apache.flink.agents.integrations.embeddingmodels.ollama.OllamaEmbeddingModelSetup;
+import 
org.apache.flink.agents.integrations.vectorstores.elasticsearch.ElasticsearchVectorStore;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Retrieval-Augmented Generation (RAG) example using Ollama for embeddings 
and chat along with
+ * Elasticsearch as the vector store.
+ *
+ * <p>This example demonstrates an agent that:
+ *
+ * <ul>
+ *   <li>Embeds the incoming user query using an Ollama embedding model,
+ *   <li>Retrieves relevant context from Elasticsearch via a {@code 
VectorStore},
+ *   <li>Formats a prompt that includes the retrieved context, and
+ *   <li>Generates a response using an Ollama chat model.
+ * </ul>
+ *
+ * <p>Prerequisites:
+ *
+ * <ul>
+ *   <li>Elasticsearch 8.x reachable from this example with an index that 
contains a {@code
+ *       dense_vector} field for KNN search.
+ *   <li>Ollama running locally (default {@code http://localhost:11434}) with 
the configured
+ *       embedding and chat models available.
+ * </ul>
+ *
+ * <p>Example Elasticsearch mapping (adjust index name, field, dims, and 
similarity to your needs):
+ *
+ * <pre>{@code
+ * {
+ *   "mappings": {
+ *     "properties": {
+ *       "content": { "type": "text" },
+ *       "metadata": { "type": "object", "enabled": false },
+ *       "content_vector": { "type": "dense_vector", "dims": 768, 
"similarity": "cosine" }
+ *     }
+ *   }
+ * }
+ * }</pre>
+ *
+ * <p>System properties you can override:
+ *
+ * <ul>
+ *   <li>{@code ES_HOST} (default {@code http://localhost:9200})
+ *   <li>{@code ES_INDEX} (default {@code my_documents})
+ *   <li>{@code ES_VECTOR_FIELD} (default {@code content_vector})
+ *   <li>{@code ES_DIMS} (default {@code 768})
+ *   <li>{@code ES_SIMILARITY} (default {@code cosine}) — used by the optional 
population step
+ *   <li>Authentication (optional, used by both vector store and population 
step):
+ *       <ul>
+ *         <li>{@code ES_API_KEY_BASE64} — Base64 of {@code 
apiKeyId:apiKeySecret}
+ *         <li>{@code ES_API_KEY_ID} and {@code ES_API_KEY_SECRET} — combined 
and Base64-encoded
+ *         <li>{@code ES_USERNAME} and {@code ES_PASSWORD} — basic 
authentication
+ *       </ul>
+ *   <li>{@code OLLAMA_ENDPOINT} (default {@code http://localhost:11434})
+ *   <li>{@code OLLAMA_EMBEDDING_MODEL} (default {@code nomic-embed-text})
+ *   <li>{@code OLLAMA_CHAT_MODEL} (default {@code qwen3:8b})
+ *   <li>{@code ES_POPULATE} (default {@code true}) — whether to populate 
sample data on startup
+ * </ul>
+ *
+ * <p>Direct CLI flags (optional): <br>
+ * Instead of or in addition to system properties, you can pass flags when 
starting the job. CLI
+ * flags take precedence over existing system properties.
+ *
+ * <ul>
+ *   <li>{@code --es.host=http://your-es:9200}
+ *   <li>{@code --es.username=elastic} and {@code --es.password=secret}
+ *   <li>{@code --es.apiKeyBase64=BASE64_ID_COLON_SECRET} or {@code 
--es.apiKeyId=ID} with {@code
+ *       --es.apiKeySecret=SECRET}
+ *   <li>{@code --es.index=my_documents}, {@code 
--es.vectorField=content_vector}, {@code
+ *       --es.dims=768}, {@code --es.similarity=cosine}
+ *   <li>{@code --ollama.endpoint=http://localhost:11434}, {@code
+ *       --ollama.embeddingModel=nomic-embed-text}, {@code 
--ollama.chatModel=qwen3:8b}
+ *   <li>{@code --es.populate=true|false}
+ * </ul>
+ *
+ * <p>Examples:
+ *
+ * <pre>{@code
+ * # Use API key (base64 of id:secret) and custom host
+ * flink run ... -c 
org.apache.flink.agents.examples.rag.ElasticsearchRagExample \
+ *   examples.jar --es.host=http://es:9200 --es.apiKeyBase64=XXXXX=
+ *
+ * # Use basic authentication
+ * flink run ... -c 
org.apache.flink.agents.examples.rag.ElasticsearchRagExample \
+ *   examples.jar --es.host=http://es:9200 --es.username=elastic 
--es.password=secret
+ * }</pre>
+ *
+ * <p>Notes:
+ *
+ * <ul>
+ *   <li>Authentication can be provided via either CLI flags or System 
properties; API key takes
+ *       precedence over basic auth when both are present.
+ *   <li>The optional knowledge base population step uses the same System 
properties to connect to
+ *       Elasticsearch.
+ * </ul>
+ *
+ * <p>Running the example will:
+ *
+ * <ol>
+ *   <li>Optionally populate the Elasticsearch index with sample documents and 
stored vectors,
+ *   <li>Create a simple agent pipeline that retrieves context from 
Elasticsearch, and
+ *   <li>Print the model's answers for a set of example queries.
+ * </ol>
+ */
+public class ElasticsearchRagExample {
+
+    public static class MyRagAgent extends Agent {
+
+        @Prompt
+        public static org.apache.flink.agents.api.prompt.Prompt 
contextEnhancedPrompt() {
+            String template =
+                    "Based on the following context, please answer the user's 
question.\n\n"
+                            + "Context:\n{context}\n\n"
+                            + "User Question:\n{user_query}\n\n"
+                            + "Please provide a helpful answer based on the 
context provided.";
+            return new org.apache.flink.agents.api.prompt.Prompt(template);
+        }
+
+        @EmbeddingModelConnection
+        public static ResourceDescriptor textEmbedderConnection() {
+            return ResourceDescriptor.Builder.newBuilder(
+                            OllamaEmbeddingModelConnection.class.getName())
+                    .addInitialArgument(
+                            "host", System.getProperty("OLLAMA_ENDPOINT", 
"http://localhost:11434";))
+                    .addInitialArgument(
+                            "model",
+                            System.getProperty("OLLAMA_EMBEDDING_MODEL", 
"nomic-embed-text"))
+                    .build();
+        }
+
+        @EmbeddingModelSetup
+        public static ResourceDescriptor textEmbedder() {
+            // Embedding setup referencing the embedding connection name
+            return 
ResourceDescriptor.Builder.newBuilder(OllamaEmbeddingModelSetup.class.getName())
+                    .addInitialArgument("connection", "textEmbedderConnection")
+                    .build();
+        }
+
+        @ChatModelConnection
+        public static ResourceDescriptor ollamaChatModelConnection() {
+            return 
ResourceDescriptor.Builder.newBuilder(OllamaChatModelConnection.class.getName())
+                    .addInitialArgument(
+                            "endpoint",
+                            System.getProperty("OLLAMA_ENDPOINT", 
"http://localhost:11434";))
+                    .addInitialArgument("requestTimeout", 120)
+                    .build();
+        }
+
+        @ChatModelSetup
+        public static ResourceDescriptor chatModel() {
+            return 
ResourceDescriptor.Builder.newBuilder(OllamaChatModelSetup.class.getName())
+                    .addInitialArgument("connection", 
"ollamaChatModelConnection")
+                    .addInitialArgument(
+                            "model", System.getProperty("OLLAMA_CHAT_MODEL", 
"qwen3:8b"))
+                    .build();
+        }
+
+        @VectorStore
+        public static ResourceDescriptor knowledgeBase() {
+            ResourceDescriptor.Builder builder =
+                    
ResourceDescriptor.Builder.newBuilder(ElasticsearchVectorStore.class.getName())
+                            .addInitialArgument("embedding_model", 
"textEmbedder")
+                            .addInitialArgument(
+                                    "index", System.getProperty("ES_INDEX", 
"my_documents"))
+                            .addInitialArgument(
+                                    "vector_field",
+                                    System.getProperty("ES_VECTOR_FIELD", 
"content_vector"))
+                            .addInitialArgument("dims", 
Integer.getInteger("ES_DIMS", 768))
+                            .addInitialArgument(
+                                    "host", System.getProperty("ES_HOST", 
"http://localhost:9200";));
+
+            // Optional authentication
+            String apiKeyBase64 = System.getProperty("ES_API_KEY_BASE64");
+            String apiKeyId = System.getProperty("ES_API_KEY_ID");
+            String apiKeySecret = System.getProperty("ES_API_KEY_SECRET");
+            String username = System.getProperty("ES_USERNAME");
+            String password = System.getProperty("ES_PASSWORD");
+
+            if (apiKeyBase64 != null && !apiKeyBase64.isEmpty()) {
+                builder.addInitialArgument("api_key_base64", apiKeyBase64);
+            } else if (apiKeyId != null
+                    && apiKeySecret != null
+                    && !apiKeyId.isEmpty()
+                    && !apiKeySecret.isEmpty()) {
+                builder.addInitialArgument("api_key_id", apiKeyId)
+                        .addInitialArgument("api_key_secret", apiKeySecret);
+            } else if (username != null
+                    && password != null
+                    && !username.isEmpty()
+                    && !password.isEmpty()) {
+                builder.addInitialArgument("username", username)
+                        .addInitialArgument("password", password);
+            }
+
+            return builder.build();
+        }
+
+        /**
+         * Converts an incoming {@link InputEvent} into a {@link 
ContextRetrievalRequestEvent} that
+         * asks the vector store to fetch relevant documents for the input 
string. The vector store
+         * resource is referenced by name ({@code "knowledgeBase"}).
+         */
+        @Action(listenEvents = {InputEvent.class})
+        public static void processInput(InputEvent event, RunnerContext ctx) {
+            ctx.sendEvent(
+                    new ContextRetrievalRequestEvent((String) 
event.getInput(), "knowledgeBase"));
+        }
+
+        /**
+         * Receives retrieved documents from the vector store, constructs a 
context string, formats
+         * the prompt using the {@code contextEnhancedPrompt}, and emits a 
{@link ChatRequestEvent}
+         * targeting the configured chat model.
+         *
+         * @param event contains the user query and the list of retrieved 
documents
+         * @param context provides access to resources (e.g., the prompt) and 
lets the agent send
+         *     the next event
+         */
+        @Action(listenEvents = {ContextRetrievalResponseEvent.class})
+        public static void processRetrievedContext(
+                ContextRetrievalResponseEvent<Map<String, Object>> event, 
RunnerContext context)
+                throws Exception {
+            final String userQuery = event.getQuery();
+            final List<Document<Map<String, Object>>> docs = 
event.getDocuments();
+
+            // Build context text from retrieved documents
+            List<String> items = new ArrayList<>();
+            for (int i = 0; i < docs.size(); i++) {
+                Object content = docs.get(i).getContent();
+                items.add(String.format("%d. %s", i + 1, content));
+            }
+            String contextText = String.join("\n\n", items);

Review Comment:
   I think retrieving the entire map from ES does not necessarily mean feeding 
everything to LLM. We can apply transformations on the retrieved data before 
sending them to the LLM. This can be done in several ways:
   - The user can add a custom Action that listens to the context retrieval 
response.
   - We can introduce a built-in RagAgent that accepts a function which will be 
applied on the retrieved documents
   - We can add a function parameter in the general vector store interface 
which will be applied on the retrieved data
   With either of the above options, we don't need to do this specifically for 
each vector store implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to