This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new e55516f  [api][integration][java] Enrich interface for vector store 
and implement these for elasticsearch. (#416)
e55516f is described below

commit e55516fd7a9f0740a51c27b7ea7c41cc156f4531
Author: Wenjin Xie <[email protected]>
AuthorDate: Wed Jan 7 23:09:16 2026 +0800

    [api][integration][java] Enrich interface for vector store and implement 
these for elasticsearch. (#416)
---
 .../agents/api/vectorstores/BaseVectorStore.java   |  87 +++-
 .../CollectionManageableVectorStore.java           |  67 +++
 .../flink/agents/api/vectorstores/Document.java    |  52 ++
 .../agents/api/vectorstores/VectorStoreQuery.java  |  26 +-
 .../elasticsearch/ElasticsearchVectorStore.java    | 576 ++++++++++++++++++++-
 .../ElasticsearchVectorStoreTest.java              | 151 ++++++
 6 files changed, 933 insertions(+), 26 deletions(-)

diff --git 
a/api/src/main/java/org/apache/flink/agents/api/vectorstores/BaseVectorStore.java
 
b/api/src/main/java/org/apache/flink/agents/api/vectorstores/BaseVectorStore.java
index de74c42..64cc745 100644
--- 
a/api/src/main/java/org/apache/flink/agents/api/vectorstores/BaseVectorStore.java
+++ 
b/api/src/main/java/org/apache/flink/agents/api/vectorstores/BaseVectorStore.java
@@ -23,6 +23,9 @@ import org.apache.flink.agents.api.resource.Resource;
 import org.apache.flink.agents.api.resource.ResourceDescriptor;
 import org.apache.flink.agents.api.resource.ResourceType;
 
+import javax.annotation.Nullable;
+
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.function.BiFunction;
@@ -56,6 +59,34 @@ public abstract class BaseVectorStore extends Resource {
      */
     public abstract Map<String, Object> getStoreKwargs();
 
+    /**
+     * Add documents to vector store.
+     *
+     * @param documents The documents to be added.
+     * @param collection The name of the collection to add to. If is null, 
will add documents to the
+     *     default collection.
+     * @param extraArgs The vector store specific arguments.
+     * @return The IDs of the documents added.
+     */
+    public List<String> add(
+            List<Document> documents, @Nullable String collection, Map<String, 
Object> extraArgs)
+            throws IOException {
+        final BaseEmbeddingModelSetup embeddingModel =
+                (BaseEmbeddingModelSetup)
+                        this.getResource.apply(this.embeddingModel, 
ResourceType.EMBEDDING_MODEL);
+
+        for (Document doc : documents) {
+            if (doc.getEmbedding() == null) {
+                doc.setEmbedding(embeddingModel.embed(doc.getContent()));
+            }
+        }
+
+        final Map<String, Object> storeKwargs = this.getStoreKwargs();
+        storeKwargs.putAll(extraArgs);
+
+        return this.addEmbedding(documents, collection, extraArgs);
+    }
+
     /**
      * Performs vector search using structured query object. Converts text 
query to embeddings and
      * returns structured query result.
@@ -74,19 +105,69 @@ public abstract class BaseVectorStore extends Resource {
         storeKwargs.putAll(query.getExtraArgs());
 
         final List<Document> documents =
-                this.queryEmbedding(queryEmbedding, query.getLimit(), 
storeKwargs);
+                this.queryEmbedding(
+                        queryEmbedding, query.getLimit(), 
query.getCollection(), storeKwargs);
 
         return new VectorStoreQueryResult(documents);
     }
 
+    /**
+     * Get the size of the collection in vector store.
+     *
+     * @param collection The name of the collection to count. If is null, 
count the default
+     *     collection.
+     * @return The documents count in the collection.
+     */
+    public abstract long size(@Nullable String collection) throws Exception;
+
+    /**
+     * Retrieve documents from the vector store.
+     *
+     * @param ids The ids of the documents. If is null, get all the documents 
or first n documents
+     *     according to implementation specific limit.
+     * @param collection The name of the collection to be retrieved. If is 
null, retrieve the
+     *     default collection.
+     * @param extraArgs Additional arguments.
+     * @return List of documents retrieved.
+     */
+    public abstract List<Document> get(
+            @Nullable List<String> ids, @Nullable String collection, 
Map<String, Object> extraArgs)
+            throws IOException;
+
+    /**
+     * Delete documents in the vector store.
+     *
+     * @param ids The ids of the documents. If is null, delete all the 
documents.
+     * @param collection The name of the collection the documents belong to. 
If is null, use the
+     *     default collection.
+     * @param extraArgs Additional arguments.
+     */
+    public abstract void delete(
+            @Nullable List<String> ids, @Nullable String collection, 
Map<String, Object> extraArgs)
+            throws IOException;
+
     /**
      * Performs vector search using a pre-computed embedding.
      *
      * @param embedding The embedding vector to search with
      * @param limit Maximum number of results to return
+     * @param collection The collection to query to. If is null, query the 
default collection.
      * @param args Additional arguments for the vector search
      * @return List of documents matching the query embedding
      */
-    public abstract List<Document> queryEmbedding(
-            float[] embedding, int limit, Map<String, Object> args);
+    protected abstract List<Document> queryEmbedding(
+            float[] embedding, int limit, @Nullable String collection, 
Map<String, Object> args);
+
+    /**
+     * Add documents with pre-computed embedding to vector store.
+     *
+     * @param documents The documents to be added.
+     * @param collection The name of the collection to add to. If is null, add 
to the default
+     *     collection.
+     * @param extraArgs Additional arguments.
+     * @return IDs of the added documents.
+     */
+    protected abstract List<String> addEmbedding(
+            List<Document> documents, @Nullable String collection, Map<String, 
Object> extraArgs)
+            throws IOException;
 }
diff --git 
a/api/src/main/java/org/apache/flink/agents/api/vectorstores/CollectionManageableVectorStore.java
 
b/api/src/main/java/org/apache/flink/agents/api/vectorstores/CollectionManageableVectorStore.java
new file mode 100644
index 0000000..0a247ae
--- /dev/null
+++ 
b/api/src/main/java/org/apache/flink/agents/api/vectorstores/CollectionManageableVectorStore.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.api.vectorstores;
+
+import java.util.Map;
+
+/** Base abstract class for vector store which support collection management. 
*/
+public interface CollectionManageableVectorStore {
+
+    class Collection {
+        private final String name;
+        private final Map<String, Object> metadata;
+
+        public Collection(String name, Map<String, Object> metadata) {
+            this.name = name;
+            this.metadata = metadata;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public Map<String, Object> getMetadata() {
+            return metadata;
+        }
+    }
+
+    /**
+     * Get a collection, or create it if it doesn't exist.
+     *
+     * @param name The name of the collection to get or create.
+     * @param metadata The metadata of the collection.
+     * @return The retrieved or created collection.
+     */
+    Collection getOrCreateCollection(String name, Map<String, Object> 
metadata) throws Exception;
+
+    /**
+     * Get a collection by name.
+     *
+     * @param name The name of the collection to get.
+     * @return The retrieved collection.
+     */
+    Collection getCollection(String name) throws Exception;
+
+    /**
+     * Delete a collection by name.
+     *
+     * @param name The name of the collection to delete.
+     * @return The deleted collection.
+     */
+    Collection deleteCollection(String name) throws Exception;
+}
diff --git 
a/api/src/main/java/org/apache/flink/agents/api/vectorstores/Document.java 
b/api/src/main/java/org/apache/flink/agents/api/vectorstores/Document.java
index 3fccdc8..00a308c 100644
--- a/api/src/main/java/org/apache/flink/agents/api/vectorstores/Document.java
+++ b/api/src/main/java/org/apache/flink/agents/api/vectorstores/Document.java
@@ -18,7 +18,11 @@
 
 package org.apache.flink.agents.api.vectorstores;
 
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * A document retrieved from vector store search.
@@ -38,10 +42,18 @@ public class Document {
     /** Document metadata such as source, author, timestamp, etc. */
     private final Map<String, Object> metadata;
 
+    private @Nullable float[] embedding;
+
     public Document(String content, Map<String, Object> metadata, String id) {
+        this(content, metadata, id, null);
+    }
+
+    public Document(
+            String content, Map<String, Object> metadata, String id, @Nullable 
float[] embedding) {
         this.content = content;
         this.metadata = metadata;
         this.id = id;
+        this.embedding = embedding;
     }
 
     public String getContent() {
@@ -55,4 +67,44 @@ public class Document {
     public String getId() {
         return id;
     }
+
+    public void setEmbedding(float[] embedding) {
+        this.embedding = embedding;
+    }
+
+    @Nullable
+    public float[] getEmbedding() {
+        return embedding;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) return false;
+        Document document = (Document) o;
+        return Objects.equals(id, document.id)
+                && Objects.equals(content, document.content)
+                && Objects.equals(metadata, document.metadata)
+                && Arrays.equals(embedding, document.embedding);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(id, content, metadata, Arrays.hashCode(embedding));
+    }
+
+    @Override
+    public String toString() {
+        return "Document{"
+                + "id='"
+                + id
+                + '\''
+                + ", content='"
+                + content
+                + '\''
+                + ", metadata="
+                + metadata
+                + ", embedding="
+                + Arrays.toString(embedding)
+                + '}';
+    }
 }
diff --git 
a/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQuery.java
 
b/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQuery.java
index af31dc6..bc55682 100644
--- 
a/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQuery.java
+++ 
b/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQuery.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.agents.api.vectorstores;
 
+import javax.annotation.Nullable;
+
 import java.util.HashMap;
 import java.util.Map;
 
@@ -36,20 +38,26 @@ public class VectorStoreQuery {
     private final String queryText;
     /** Maximum number of documents to return. */
     private final Integer limit;
+    /** The name of the collection to query to. */
+    private final @Nullable String collection;
     /** Additional store-specific parameters. */
     private final Map<String, Object> extraArgs;
 
+    public VectorStoreQuery(String queryText, Integer limit) {
+        this(VectorStoreQueryMode.SEMANTIC, queryText, limit, null, new 
HashMap<>());
+    }
+
     /**
      * Creates a semantic-search query with default mode {@link 
VectorStoreQueryMode#SEMANTIC}.
      *
      * @param queryText the text to embed and search for
      * @param limit maximum number of results to return
+     * @param collection the collection to query to
+     * @param extraArgs store-specific additional parameters
      */
-    public VectorStoreQuery(String queryText, Integer limit) {
-        this.mode = VectorStoreQueryMode.SEMANTIC;
-        this.queryText = queryText;
-        this.limit = limit;
-        this.extraArgs = new HashMap<>();
+    public VectorStoreQuery(
+            String queryText, Integer limit, String collection, Map<String, 
Object> extraArgs) {
+        this(VectorStoreQueryMode.SEMANTIC, queryText, limit, collection, new 
HashMap<>());
     }
 
     /**
@@ -58,16 +66,19 @@ public class VectorStoreQuery {
      * @param mode the query mode
      * @param queryText the text to search for
      * @param limit maximum number of results to return
+     * @param collection the collection to query to
      * @param extraArgs store-specific additional parameters
      */
     public VectorStoreQuery(
             VectorStoreQueryMode mode,
             String queryText,
             Integer limit,
+            @Nullable String collection,
             Map<String, Object> extraArgs) {
         this.mode = mode;
         this.queryText = queryText;
         this.limit = limit;
+        this.collection = collection;
         this.extraArgs = extraArgs;
     }
 
@@ -90,4 +101,9 @@ public class VectorStoreQuery {
     public Map<String, Object> getExtraArgs() {
         return extraArgs;
     }
+
+    @Nullable
+    public String getCollection() {
+        return collection;
+    }
 }
diff --git 
a/integrations/vector-stores/elasticsearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/elasticsearch/ElasticsearchVectorStore.java
 
b/integrations/vector-stores/elasticsearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/elasticsearch/ElasticsearchVectorStore.java
index 4b63289..022f8df 100644
--- 
a/integrations/vector-stores/elasticsearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/elasticsearch/ElasticsearchVectorStore.java
+++ 
b/integrations/vector-stores/elasticsearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/elasticsearch/ElasticsearchVectorStore.java
@@ -19,9 +19,29 @@
 package org.apache.flink.agents.integrations.vectorstores.elasticsearch;
 
 import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch._types.mapping.DynamicMapping;
+import co.elastic.clients.elasticsearch._types.mapping.Property;
+import co.elastic.clients.elasticsearch.core.BulkRequest;
+import co.elastic.clients.elasticsearch.core.BulkResponse;
+import co.elastic.clients.elasticsearch.core.CountRequest;
+import co.elastic.clients.elasticsearch.core.CountResponse;
+import co.elastic.clients.elasticsearch.core.DeleteByQueryRequest;
+import co.elastic.clients.elasticsearch.core.DeleteByQueryResponse;
+import co.elastic.clients.elasticsearch.core.DeleteRequest;
+import co.elastic.clients.elasticsearch.core.GetRequest;
+import co.elastic.clients.elasticsearch.core.GetResponse;
+import co.elastic.clients.elasticsearch.core.IndexRequest;
+import co.elastic.clients.elasticsearch.core.MgetRequest;
+import co.elastic.clients.elasticsearch.core.MgetResponse;
 import co.elastic.clients.elasticsearch.core.SearchRequest;
 import co.elastic.clients.elasticsearch.core.SearchResponse;
+import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
+import co.elastic.clients.elasticsearch.core.get.GetResult;
+import co.elastic.clients.elasticsearch.core.mget.MultiGetResponseItem;
 import co.elastic.clients.elasticsearch.core.search.Hit;
+import co.elastic.clients.elasticsearch.indices.CreateIndexRequest;
+import co.elastic.clients.elasticsearch.indices.DeleteIndexRequest;
+import co.elastic.clients.elasticsearch.indices.ExistsRequest;
 import co.elastic.clients.json.jackson.JacksonJsonpMapper;
 import co.elastic.clients.transport.ElasticsearchTransport;
 import co.elastic.clients.transport.rest_client.RestClientTransport;
@@ -31,6 +51,7 @@ import org.apache.flink.agents.api.resource.Resource;
 import org.apache.flink.agents.api.resource.ResourceDescriptor;
 import org.apache.flink.agents.api.resource.ResourceType;
 import org.apache.flink.agents.api.vectorstores.BaseVectorStore;
+import 
org.apache.flink.agents.api.vectorstores.CollectionManageableVectorStore;
 import org.apache.flink.agents.api.vectorstores.Document;
 import org.apache.http.Header;
 import org.apache.http.HttpHost;
@@ -41,6 +62,8 @@ import org.apache.http.message.BasicHeader;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestClientBuilder;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.StringReader;
 import java.nio.charset.StandardCharsets;
@@ -91,16 +114,30 @@ import java.util.function.BiFunction;
  *     .build();
  * }</pre>
  */
-public class ElasticsearchVectorStore extends BaseVectorStore {
+public class ElasticsearchVectorStore extends BaseVectorStore
+        implements CollectionManageableVectorStore {
 
     /** Default vector dimensionality used when {@code dims} is not provided. 
*/
     public static final int DEFAULT_DIMENSION = 768;
+    /** The maximum number of documents that can be retrieved in get. */
+    public static final int MAX_RESULT_WINDOW = 10000;
+
+    public static final String DEFAULT_METADATA_FIELD = "_metadata";
+    public static final String DEFAULT_CONTENT_FIELD = "_content";
+    public static final String DEFAULT_VECTOR_FIELD = "_vector";
+    public static final String COLLECTION_METADATA_INDEX = 
"collection_metadata";
+    public static final String COLLECTION_METADATA_FIELD = "metadata";
+    public static final String COLLECTION_INDEX_NAME_FIELD = "index_name";
 
     /** Low-level Elasticsearch client used to execute search requests. */
     private final ElasticsearchClient client;
 
-    /** Target index name. */
+    /** Default index name. */
     private final String index;
+    /** Name of the content field to store the document content. */
+    private final String contentField;
+    /** Name of the metadata field to store additional metadatas. */
+    private final String metadataField;
     /** Name of the dense vector field on which KNN queries are executed. */
     private final String vectorField;
     /** Vector dimensionality of the {@link #vectorField}. */
@@ -114,6 +151,9 @@ public class ElasticsearchVectorStore extends 
BaseVectorStore {
 
     private final ObjectMapper mapper = new ObjectMapper();
 
+    /** Whether the content of document is stored in content field. */
+    private final boolean storeInContentField;
+
     /**
      * Creates a new {@code ElasticsearchVectorStore} from the provided 
descriptor and resource
      * resolver.
@@ -129,9 +169,20 @@ public class ElasticsearchVectorStore extends 
BaseVectorStore {
             ResourceDescriptor descriptor, BiFunction<String, ResourceType, 
Resource> getResource) {
         super(descriptor, getResource);
 
+        this.storeInContentField =
+                
Objects.requireNonNullElse(descriptor.getArgument("store_in_content_field"), 
true);
+
         // Required query-related arguments
         this.index = descriptor.getArgument("index");
-        this.vectorField = descriptor.getArgument("vector_field");
+        this.vectorField =
+                Objects.requireNonNullElse(
+                        descriptor.getArgument("vector_field"), 
DEFAULT_VECTOR_FIELD);
+        this.contentField =
+                Objects.requireNonNullElse(
+                        descriptor.getArgument("content_field"), 
DEFAULT_CONTENT_FIELD);
+        this.metadataField =
+                Objects.requireNonNullElse(
+                        descriptor.getArgument("metadata_field"), 
DEFAULT_METADATA_FIELD);
         final Integer dimsArg = descriptor.getArgument("dims");
         this.dims = (dimsArg != null) ? dimsArg : DEFAULT_DIMENSION;
         this.filterQuery = descriptor.getArgument("filter_query");
@@ -146,14 +197,6 @@ public class ElasticsearchVectorStore extends 
BaseVectorStore {
             }
         }
 
-        if (this.vectorField == null || this.vectorField.isEmpty()) {
-            throw new IllegalArgumentException("'vector_field' should not be 
null or empty");
-        }
-
-        if (this.index == null || this.index.isEmpty()) {
-            throw new IllegalArgumentException("'index' should not be null or 
empty");
-        }
-
         // Resolve Elasticsearch HTTP hosts. Precedence: host -> hosts -> 
default localhost
         final String hostUrl = descriptor.getArgument("host");
         final String hostsCsv = descriptor.getArgument("hosts");
@@ -205,6 +248,199 @@ public class ElasticsearchVectorStore extends 
BaseVectorStore {
         this.client = new ElasticsearchClient(transport);
     }
 
+    @Override
+    public Collection getOrCreateCollection(String name, Map<String, Object> 
metadata)
+            throws Exception {
+        // Check if index exists
+        ExistsRequest existsRequest = ExistsRequest.of(e -> e.index(name));
+        boolean exists = this.client.indices().exists(existsRequest).value();
+
+        if (!exists) {
+            // Store collection metadata
+            if (metadata != null && !metadata.isEmpty()) {
+                storeCollectionMetadata(name, metadata);
+            }
+
+            // Create index correspond to the collection.
+            createIndex(name, metadata);
+        }
+
+        return new Collection(name, metadata != null ? metadata : 
Collections.emptyMap());
+    }
+
+    /**
+     * Creates an Elasticsearch index with vector field mapping.
+     *
+     * @param indexName The name of the index to create
+     * @param metadata Optional metadata for the index
+     * @throws IOException if the index creation fails
+     */
+    private void createIndex(String indexName, Map<String, Object> metadata) 
throws IOException {
+
+        // Build mappings with vector field
+        Map<String, Property> properties = new HashMap<>();
+
+        // Add vector field mapping
+        Property vectorProperty =
+                Property.of(p -> p.denseVector(dv -> 
dv.dims(this.dims).index(true)));
+        properties.put(this.vectorField, vectorProperty);
+
+        // Add text field for content
+        Property textProperty = Property.of(p -> p.text(t -> t));
+        properties.put(this.contentField, textProperty);
+
+        // Add metadata field mapping
+        Property metadataProperty = Property.of(p -> p.object(o -> o));
+        properties.put(this.metadataField, metadataProperty);
+
+        CreateIndexRequest createRequest =
+                CreateIndexRequest.of(
+                        c ->
+                                c.index(indexName)
+                                        .mappings(
+                                                m ->
+                                                        
m.properties(properties)
+                                                                
.dynamic(DynamicMapping.True)));
+
+        this.client.indices().create(createRequest);
+    }
+
+    /**
+     * Stores collection metadata in the COLLECTION_METADATA_INDEX.
+     *
+     * <p>This method creates the metadata index if it doesn't exist, and 
stores the metadata
+     * associated with the given index name. The index name is used as the 
document ID.
+     *
+     * @param indexName The name of the collection/index
+     * @param metadata The metadata to store
+     * @throws IOException if the operation fails
+     */
+    private void storeCollectionMetadata(String indexName, Map<String, Object> 
metadata)
+            throws IOException {
+        // Check if metadata index exists
+        ExistsRequest existsRequest = ExistsRequest.of(e -> 
e.index(COLLECTION_METADATA_INDEX));
+        boolean exists = this.client.indices().exists(existsRequest).value();
+
+        if (!exists) {
+            // Build mappings for metadata index
+            Map<String, Property> properties = new HashMap<>();
+
+            // Add object field for metadata (to store structured metadata)
+            Property metadataProperty = Property.of(p -> p.object(o -> o));
+            properties.put(COLLECTION_METADATA_FIELD, metadataProperty);
+
+            // Add text field for index name
+            Property indexNameProperty = Property.of(p -> p.keyword(k -> k));
+            properties.put(COLLECTION_INDEX_NAME_FIELD, indexNameProperty);
+
+            // Create metadata index
+            CreateIndexRequest createRequest =
+                    CreateIndexRequest.of(
+                            c ->
+                                    c.index(COLLECTION_METADATA_INDEX)
+                                            .mappings(
+                                                    m ->
+                                                            
m.properties(properties)
+                                                                    
.dynamic(DynamicMapping.True)));
+
+            this.client.indices().create(createRequest);
+        }
+
+        // Store metadata document (use indexName as document ID)
+        Map<String, Object> source = new HashMap<>();
+        source.put(COLLECTION_INDEX_NAME_FIELD, indexName);
+        source.put(COLLECTION_METADATA_FIELD, metadata);
+
+        IndexRequest<Map<String, Object>> indexRequest =
+                IndexRequest.of(
+                        i -> 
i.index(COLLECTION_METADATA_INDEX).id(indexName).document(source));
+
+        this.client.index(indexRequest);
+    }
+
+    /**
+     * Gets a collection by name.
+     *
+     * <p>This method retrieves the collection metadata from 
COLLECTION_METADATA_INDEX using the
+     * collection name as the document ID.
+     *
+     * @param name The name of the collection to retrieve
+     * @return The retrieved collection with its metadata
+     * @throws Exception if the collection doesn't exist or the operation fails
+     */
+    @Override
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public Collection getCollection(String name) throws Exception {
+        // Check if index exists
+        ExistsRequest existsRequest = ExistsRequest.of(e -> e.index(name));
+        boolean exists = this.client.indices().exists(existsRequest).value();
+
+        if (!exists) {
+            throw new RuntimeException(String.format("Collection %s not 
found", name));
+        }
+
+        // Get collection metadata from COLLECTION_METADATA_INDEX
+        GetRequest getRequest = GetRequest.of(g -> 
g.index(COLLECTION_METADATA_INDEX).id(name));
+
+        GetResponse<Map<String, Object>> getResponse =
+                (GetResponse) this.client.get(getRequest, Map.class);
+
+        // Check if document exists
+        if (!getResponse.found()) {
+            throw new RuntimeException(String.format("Metadata for Collection 
%s not found", name));
+        }
+
+        // Extract metadata from the document
+        Map<String, Object> source = getResponse.source();
+        if (source == null) {
+            throw new RuntimeException(String.format("Metadata for Collection 
%s is null", name));
+        }
+
+        // Get metadata field
+        Map<String, Object> metadata =
+                (Map<String, Object>)
+                        source.getOrDefault(COLLECTION_METADATA_FIELD, 
Collections.emptyMap());
+
+        // Return collection object
+        return new Collection(name, metadata);
+    }
+
+    /**
+     * Deletes a collection by name.
+     *
+     * <p>This method deletes both the collection index and its metadata from
+     * COLLECTION_METADATA_INDEX. It first retrieves the collection metadata, 
then deletes the index
+     * and the metadata document.
+     *
+     * @param name The name of the collection to delete
+     * @return The deleted collection with its metadata
+     * @throws RuntimeException if the collection doesn't exist or the 
operation fails
+     */
+    @Override
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public Collection deleteCollection(String name) throws Exception {
+        // First, get the collection metadata before deletion
+        // This ensures the collection exists and retrieves its metadata for 
return
+        Collection collection;
+        try {
+            collection = getCollection(name);
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    String.format("Collection %s not found or failed to 
retrieve", name), e);
+        }
+
+        DeleteIndexRequest deleteIndexRequest = DeleteIndexRequest.of(d -> 
d.index(name));
+        this.client.indices().delete(deleteIndexRequest);
+
+        // Delete the metadata document from COLLECTION_METADATA_INDEX
+        DeleteRequest deleteRequest =
+                DeleteRequest.of(d -> 
d.index(COLLECTION_METADATA_INDEX).id(name));
+        this.client.delete(deleteRequest);
+
+        // Return the deleted collection
+        return collection;
+    }
+
     /**
      * Returns default store-level arguments collected from the descriptor.
      *
@@ -236,6 +472,211 @@ public class ElasticsearchVectorStore extends 
BaseVectorStore {
         return m;
     }
 
+    @Override
+    public long size(@Nullable String collection) throws Exception {
+        String index = collection == null ? this.index : collection;
+        CountRequest countRequest = CountRequest.of(c -> c.index(index));
+        CountResponse countResponse = this.client.count(countRequest);
+        return countResponse.count();
+    }
+
+    /**
+     * Retrieve documents from the vector store.
+     *
+     * <p>If ids is not provided, this method will retrieve documents 
according to limit, offset and
+     * filter_query in additional arguments. If limit is also not provided, 
this method will
+     * retrieve no more than {@link 
ElasticsearchVectorStore#MAX_RESULT_WINDOW} documents because of
+     * the ElasticSearch limitation.
+     *
+     * @param ids The ids of the documents.
+     * @param collection The name of the collection to be retrieved. If is 
null, retrieve the
+     *     default collection.
+     * @param extraArgs Additional arguments. (limit, offset, filter_query, 
etc.)
+     * @return List of documents retrieved.
+     */
+    @Override
+    public List<Document> get(
+            @Nullable List<String> ids, @Nullable String collection, 
Map<String, Object> extraArgs)
+            throws IOException {
+        String index = collection == null ? this.index : collection;
+
+        if (ids != null && !ids.isEmpty()) {
+            // Get specific documents by IDs
+            return getDocumentsByIds(index, ids);
+        } else {
+            // Get all documents with optional filters, limit or offset.
+            return getDocuments(index, extraArgs);
+        }
+    }
+
+    /**
+     * Delete documents in the vector store.
+     *
+     * <p>If ids is not provided, this method will delete documents matched 
the filter_query in
+     * additional arguments. If filter_query is not provided, this method will 
delete all the
+     * documents.
+     *
+     * @param ids The ids of the documents.
+     * @param collection The name of the collection the documents belong to. 
If is null, use the
+     *     default collection.
+     * @param extraArgs Additional arguments, (filter_query, etc.)
+     */
+    @Override
+    public void delete(
+            @Nullable List<String> ids, @Nullable String collection, 
Map<String, Object> extraArgs)
+            throws IOException {
+        String index = collection == null ? this.index : collection;
+
+        if (ids != null && !ids.isEmpty()) {
+            // Delete specific documents by IDs
+            deleteDocumentsByIds(index, ids);
+        } else {
+            // Delete all documents with optional filters
+            deleteDocuments(index, extraArgs);
+        }
+    }
+
+    /**
+     * Retrieves documents by their IDs using Elasticsearch multi-get API.
+     *
+     * @param index The index to query
+     * @param ids List of document IDs to retrieve
+     * @return List of Documents
+     * @throws IOException if the request fails
+     * @throws JsonProcessingException if JSON processing fails
+     */
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    private List<Document> getDocumentsByIds(String index, List<String> ids)
+            throws IOException, JsonProcessingException {
+        MgetRequest mgetRequest = MgetRequest.of(m -> m.index(index).ids(ids));
+
+        MgetResponse<Map<String, Object>> mgetResponse =
+                (MgetResponse) this.client.mget(mgetRequest, Map.class);
+
+        List<Document> documents = new ArrayList<>();
+        for (MultiGetResponseItem<Map<String, Object>> item : 
mgetResponse.docs()) {
+            if (item.isResult()) {
+                GetResult<Map<String, Object>> getResult = item.result();
+                Map<String, Object> source = getResult.source();
+                String id = getResult.id();
+                Document document = getDocument(id, source);
+                documents.add(document);
+            }
+        }
+        return documents;
+    }
+
+    /**
+     * Retrieves documents using Elasticsearch search API with optional 
filters.
+     *
+     * @param index The index to query
+     * @param extraArgs Additional arguments (limit, offset, filter_query, 
etc.)
+     * @return List of Documents
+     * @throws IOException if the request fails
+     * @throws JsonProcessingException if JSON processing fails
+     */
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    private List<Document> getDocuments(String index, Map<String, Object> 
extraArgs)
+            throws IOException, JsonProcessingException {
+        SearchRequest.Builder builder = new 
SearchRequest.Builder().index(index);
+
+        // Handle limit (size)
+        Integer limit = (Integer) extraArgs.get("limit");
+        builder.size(Objects.requireNonNullElse(limit, MAX_RESULT_WINDOW));
+
+        // Handle offset (from)
+        Integer offset = (Integer) extraArgs.get("offset");
+        if (offset != null) {
+            builder.from(offset);
+        }
+
+        // Handle filter query
+        String filter = (String) extraArgs.get("filter_query");
+        if (filter != null) {
+            builder.query(q -> q.withJson(new StringReader(filter)));
+        }
+
+        // Execute search
+        SearchResponse<Map<String, Object>> searchResponse =
+                (SearchResponse) this.client.search(builder.build(), 
Map.class);
+
+        final long total = searchResponse.hits().total().value();
+        if (total == 0) {
+            return Collections.emptyList();
+        }
+
+        return getDocuments((int) total, searchResponse);
+    }
+
+    /**
+     * Deletes documents by their IDs using Elasticsearch bulk delete API.
+     *
+     * @param index The index to delete from
+     * @param ids List of document IDs to delete
+     * @throws IOException if the request fails
+     */
+    private void deleteDocumentsByIds(String index, List<String> ids) throws 
IOException {
+        // Prepare bulk delete operations
+        List<BulkOperation> bulkOperations = new ArrayList<>();
+        for (String id : ids) {
+            bulkOperations.add(BulkOperation.of(bo -> bo.delete(d -> 
d.index(index).id(id))));
+        }
+
+        // Execute bulk delete request
+        BulkRequest bulkRequest = BulkRequest.of(br -> 
br.operations(bulkOperations));
+        BulkResponse bulkResponse = this.client.bulk(bulkRequest);
+
+        // Check for errors
+        if (bulkResponse.errors()) {
+            StringBuilder errorMsg = new StringBuilder("Some documents failed 
to delete: ");
+            bulkResponse.items().stream()
+                    .filter(item -> item.error() != null)
+                    .forEach(
+                            item ->
+                                    errorMsg.append(
+                                            String.format(
+                                                    "id=%s, error=%s; ",
+                                                    item.id(), 
item.error().reason())));
+            throw new RuntimeException(errorMsg.toString());
+        }
+    }
+
+    /**
+     * Deletes documents using Elasticsearch delete by query API.
+     *
+     * @param index The index to delete from
+     * @param extraArgs Additional arguments (filter_query, etc.)
+     * @throws IOException if the request fails
+     */
+    private void deleteDocuments(String index, Map<String, Object> extraArgs) 
throws IOException {
+        DeleteByQueryRequest.Builder builder = new 
DeleteByQueryRequest.Builder().index(index);
+
+        // Handle filter query
+        String filter = (String) extraArgs.get("filter_query");
+        if (filter != null) {
+            builder.query(q -> q.withJson(new StringReader(filter)));
+        } else {
+            // If no filter provided, delete all documents (match_all query)
+            builder.query(q -> q.matchAll(ma -> ma));
+        }
+
+        // Execute delete by query
+        DeleteByQueryResponse response = 
this.client.deleteByQuery(builder.build());
+
+        // Check for failures
+        if (response.failures() != null && !response.failures().isEmpty()) {
+            StringBuilder errorMsg = new StringBuilder("Some documents failed 
to delete: ");
+            response.failures()
+                    .forEach(
+                            failure ->
+                                    errorMsg.append(
+                                            String.format(
+                                                    "id=%s, error=%s; ",
+                                                    failure.id(), 
failure.cause().reason())));
+            throw new RuntimeException(errorMsg.toString());
+        }
+    }
+
     /**
      * Executes a KNN vector search using a pre-computed embedding.
      *
@@ -246,6 +687,7 @@ public class ElasticsearchVectorStore extends 
BaseVectorStore {
      * @param embedding The embedding vector to search with
      * @param limit Maximum number of items the caller is interested in; used 
as a fallback for
      *     {@code k} if not explicitly provided
+     * @param collection The index to query search. If is null, search the 
default index.
      * @param args Additional arguments. Supported keys: {@code k}, {@code 
num_candidates}, {@code
      *     filter_query}
      * @return A list of matching documents, possibly empty
@@ -253,8 +695,10 @@ public class ElasticsearchVectorStore extends 
BaseVectorStore {
      */
     @SuppressWarnings({"rawtypes", "unchecked"})
     @Override
-    public List<Document> queryEmbedding(float[] embedding, int limit, 
Map<String, Object> args) {
+    public List<Document> queryEmbedding(
+            float[] embedding, int limit, @Nullable String collection, 
Map<String, Object> args) {
         try {
+            String index = collection == null ? this.index : collection;
             int k = (int) args.getOrDefault("k", Math.max(1, limit));
 
             int numCandidates = (int) args.getOrDefault("num_candidates", 
Math.max(100, k * 2));
@@ -265,7 +709,7 @@ public class ElasticsearchVectorStore extends 
BaseVectorStore {
 
             SearchRequest.Builder builder =
                     new SearchRequest.Builder()
-                            .index(this.index)
+                            .index(index)
                             .knn(
                                     kb ->
                                             kb.field(this.vectorField)
@@ -290,6 +734,80 @@ public class ElasticsearchVectorStore extends 
BaseVectorStore {
         }
     }
 
+    /**
+     * Add documents with pre-computed embedding to vector store.
+     *
+     * <p>ElasticSearch will set the content of the document to content field.
+     */
+    @Override
+    protected List<String> addEmbedding(
+            List<Document> documents, @Nullable String collection, Map<String, 
Object> extraArgs)
+            throws IOException {
+        String index = collection == null ? this.index : collection;
+        if (documents == null || documents.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        // Prepare bulk operations
+        List<BulkOperation> bulkOperations = new ArrayList<>();
+        List<String> documentIds = new ArrayList<>();
+
+        for (Document doc : documents) {
+            // Generate ID if not provided
+            String id = doc.getId();
+            if (id == null || id.isEmpty()) {
+                id = UUID.randomUUID().toString();
+            }
+            final String docId = id;
+            documentIds.add(docId);
+
+            Map<String, Object> source = new HashMap<>();
+
+            // Add embedding vector if available
+            float[] embedding = doc.getEmbedding();
+            if (embedding != null && embedding.length > 0) {
+                List<Float> embeddingList = new ArrayList<>(embedding.length);
+                for (float v : embedding) {
+                    embeddingList.add(v);
+                }
+                source.put(this.vectorField, embeddingList);
+            }
+
+            source.put(this.contentField, doc.getContent());
+
+            // Add metadata
+            Map<String, Object> metadata = doc.getMetadata();
+            if (metadata != null && !metadata.isEmpty()) {
+                source.put(this.metadataField, doc.getMetadata());
+            }
+
+            // Create index operation for bulk request
+            bulkOperations.add(
+                    BulkOperation.of(
+                            bo -> bo.index(io -> 
io.index(index).id(docId).document(source))));
+        }
+
+        // Execute bulk request
+        BulkRequest bulkRequest = BulkRequest.of(br -> 
br.operations(bulkOperations));
+        BulkResponse bulkResponse = this.client.bulk(bulkRequest);
+
+        // Check for errors
+        if (bulkResponse.errors()) {
+            StringBuilder errorMsg = new StringBuilder("Some documents failed 
to index: ");
+            bulkResponse.items().stream()
+                    .filter(item -> item.error() != null)
+                    .forEach(
+                            item ->
+                                    errorMsg.append(
+                                            String.format(
+                                                    "id=%s, error=%s; ",
+                                                    item.id(), 
item.error().reason())));
+            throw new RuntimeException(errorMsg.toString());
+        }
+
+        return documentIds;
+    }
+
     /**
      * Converts Elasticsearch hits into {@link Document} instances with 
metadata.
      *
@@ -304,13 +822,35 @@ public class ElasticsearchVectorStore extends 
BaseVectorStore {
         for (Hit<Map<String, Object>> hit : searchResponse.hits().hits()) {
             final Map<String, Object> _source = hit.source();
             final String id = hit.id();
-            final Double score = hit.score();
-            final String index = hit.index();
-            final Map<String, Object> metadata = Map.of("id", id, "score", 
score, "index", index);
-            final String content = (_source == null) ? "" : 
mapper.writeValueAsString(_source);
-            final Document document = new Document(content, metadata, id);
+            final Document document = getDocument(id, _source);
             documents.add(document);
         }
         return documents;
     }
+
+    @SuppressWarnings("unchecked")
+    private Document getDocument(String id, Map<String, Object> source)
+            throws JsonProcessingException {
+        Map<String, Object> metadata = new HashMap<>();
+        String content = "";
+        if (source != null) {
+            Map<String, Object> extra = (Map<String, Object>) 
source.remove(this.metadataField);
+            if (extra != null) {
+                metadata.putAll(extra);
+            }
+
+            // Elasticsearch supports store document as mappings. If 
storeInContentField is
+            // true,
+            // we get the content from the specific field, otherwise, we get 
the content from
+            // all
+            // the fields.
+            if (this.storeInContentField) {
+                content = (String) source.get(this.contentField);
+            } else {
+                source.remove(this.vectorField);
+                content = mapper.writeValueAsString(source);
+            }
+        }
+        return new Document(content, metadata, id);
+    }
 }
diff --git 
a/integrations/vector-stores/elasticsearch/src/test/java/org/apache/flink/agents/integrations/vectorstores/elasticsearch/ElasticsearchVectorStoreTest.java
 
b/integrations/vector-stores/elasticsearch/src/test/java/org/apache/flink/agents/integrations/vectorstores/elasticsearch/ElasticsearchVectorStoreTest.java
new file mode 100644
index 0000000..2aba422
--- /dev/null
+++ 
b/integrations/vector-stores/elasticsearch/src/test/java/org/apache/flink/agents/integrations/vectorstores/elasticsearch/ElasticsearchVectorStoreTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.integrations.vectorstores.elasticsearch;
+
+import org.apache.flink.agents.api.embedding.model.BaseEmbeddingModelSetup;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.vectorstores.BaseVectorStore;
+import 
org.apache.flink.agents.api.vectorstores.CollectionManageableVectorStore;
+import 
org.apache.flink.agents.api.vectorstores.CollectionManageableVectorStore.Collection;
+import org.apache.flink.agents.api.vectorstores.Document;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test for {@link ElasticsearchVectorStore}
+ *
+ * <p>Need setup Elasticsearch server to run this test. Look <a
+ * 
href="https://www.elastic.co/docs/deploy-manage/deploy/self-managed/install-elasticsearch-docker-basic";>Start
+ * a single-node cluster in Docker</a> for details.
+ *
+ * <p>For {@link ElasticsearchVectorStore} doesn't support security check yet, 
when start the
+ * container, should add "-e xpack.security.enabled=false" option.
+ */
+@Disabled("Should setup Elasticsearch server.")
+public class ElasticsearchVectorStoreTest {
+    public static BaseVectorStore store;
+
+    public static Resource getResource(String name, ResourceType type) {
+        BaseEmbeddingModelSetup embeddingModel = 
Mockito.mock(BaseEmbeddingModelSetup.class);
+        Mockito.when(embeddingModel.embed("Elasticsearch is a search engine"))
+                .thenReturn(new float[] {0.2f, 0.3f, 0.4f, 0.5f, 0.6f});
+        Mockito.when(
+                        embeddingModel.embed(
+                                "Apache Flink Agents is an Agentic AI 
framework based on Apache Flink."))
+                .thenReturn(new float[] {0.1f, 0.2f, 0.3f, 0.4f, 0.5f});
+        return embeddingModel;
+    }
+
+    @BeforeAll
+    public static void initialize() {
+        final ResourceDescriptor.Builder builder =
+                
ResourceDescriptor.Builder.newBuilder(ElasticsearchVectorStore.class.getName())
+                        .addInitialArgument("embedding_model", 
"embeddingModel")
+                        .addInitialArgument("host", "localhost:9200")
+                        .addInitialArgument("dims", 5)
+                        .addInitialArgument("username", "elastic")
+                        .addInitialArgument("password", 
System.getenv("ES_PASSWORD"));
+        ;
+        store =
+                new ElasticsearchVectorStore(
+                        builder.build(), 
ElasticsearchVectorStoreTest::getResource);
+    }
+
+    @Test
+    public void testCollectionManagement() throws Exception {
+        CollectionManageableVectorStore vectorStore = 
(CollectionManageableVectorStore) store;
+        String name = "collection_management";
+        Map<String, Object> metadata = Map.of("key1", "value1", "key2", 
"value2");
+        vectorStore.getOrCreateCollection(name, metadata);
+
+        Collection collection = vectorStore.getCollection(name);
+
+        Assertions.assertNotNull(collection);
+        Assertions.assertEquals(name, collection.getName());
+        Assertions.assertEquals(0, store.size(name));
+        Assertions.assertEquals(metadata, collection.getMetadata());
+
+        vectorStore.deleteCollection(name);
+
+        Assertions.assertThrows(
+                RuntimeException.class,
+                () -> vectorStore.getCollection(name),
+                String.format("Collection %s not found", name));
+    }
+
+    @Test
+    public void testDocumentManagement() throws Exception {
+        String name = "document_management";
+        Map<String, Object> metadata = Map.of("key1", "value1", "key2", 
"value2");
+        ((CollectionManageableVectorStore) store).getOrCreateCollection(name, 
metadata);
+
+        List<Document> documents = new ArrayList<>();
+        documents.add(
+                new Document(
+                        "Elasticsearch is a search engine",
+                        Map.of("category", "database", "source", "test"),
+                        "doc1"));
+        documents.add(
+                new Document(
+                        "Apache Flink Agents is an Agentic AI framework based 
on Apache Flink.",
+                        Map.of("category", "ai-agent", "source", "test"),
+                        "doc2"));
+        store.add(documents, name, Collections.emptyMap());
+        // wait for the documents to become visible in the elasticsearch 
server.
+        Thread.sleep(1000);
+        for (Document doc : documents) {
+            doc.setEmbedding(null);
+        }
+
+        // test get all documents
+        List<Document> all = store.get(null, name, Collections.emptyMap());
+        Assertions.assertEquals(documents, all);
+
+        // test get specific document
+        List<Document> specific =
+                store.get(Collections.singletonList("doc1"), name, 
Collections.emptyMap());
+        Assertions.assertEquals(1, specific.size());
+        Assertions.assertEquals(documents.get(0), specific.get(0));
+
+        // test delete specific document
+        store.delete(Collections.singletonList("doc1"), name, 
Collections.emptyMap());
+        Thread.sleep(1000);
+        List<Document> remain = store.get(null, name, Collections.emptyMap());
+        Assertions.assertEquals(1, remain.size());
+        Assertions.assertEquals(documents.get(1), remain.get(0));
+
+        // test delete all documents
+        store.delete(null, name, Collections.emptyMap());
+        Thread.sleep(1000);
+        List<Document> empty = store.get(null, name, Collections.emptyMap());
+        Assertions.assertTrue(empty.isEmpty());
+
+        ((CollectionManageableVectorStore) store).deleteCollection(name);
+    }
+}

Reply via email to