This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new 862ed2b  [FLINK-AGENTS-524] Add Amazon OpenSearch and S3 Vectors 
vector store integrations (#533)
862ed2b is described below

commit 862ed2bf186343c46699905be8682400917f7470
Author: Avichay Marciano <[email protected]>
AuthorDate: Thu Mar 5 10:53:26 2026 +0200

    [FLINK-AGENTS-524] Add Amazon OpenSearch and S3 Vectors vector store 
integrations (#533)
    
    - OpenSearchVectorStore: supports Serverless (AOSS) and Service domains, 
IAM and basic auth, SigV4 signing, bulk indexing, collection management for 
long-term memory
    - S3VectorsVectorStore: PutVectors/QueryVectors/GetVectors/DeleteVectors, 
batched puts (500/request limit)
    - Retry via unified RetryExecutor with service-specific retryable predicates
---
 dist/pom.xml                                       |  10 +
 .../vector-stores/{ => opensearch}/pom.xml         |  38 +-
 .../opensearch/OpenSearchVectorStore.java          | 558 +++++++++++++++++++++
 .../opensearch/OpenSearchVectorStoreTest.java      | 237 +++++++++
 integrations/vector-stores/pom.xml                 |   2 +
 integrations/vector-stores/{ => s3vectors}/pom.xml |  33 +-
 .../s3vectors/S3VectorsVectorStore.java            | 338 +++++++++++++
 .../s3vectors/S3VectorsVectorStoreTest.java        | 135 +++++
 8 files changed, 1335 insertions(+), 16 deletions(-)

diff --git a/dist/pom.xml b/dist/pom.xml
index b4e479a..16ed7fc 100644
--- a/dist/pom.xml
+++ b/dist/pom.xml
@@ -99,6 +99,16 @@ under the License.
             
<artifactId>flink-agents-integrations-vector-stores-elasticsearch</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-agents-integrations-vector-stores-opensearch</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-agents-integrations-vector-stores-s3vectors</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-agents-integrations-mcp</artifactId>
diff --git a/integrations/vector-stores/pom.xml 
b/integrations/vector-stores/opensearch/pom.xml
similarity index 50%
copy from integrations/vector-stores/pom.xml
copy to integrations/vector-stores/opensearch/pom.xml
index 7b9612d..6c34b88 100644
--- a/integrations/vector-stores/pom.xml
+++ b/integrations/vector-stores/opensearch/pom.xml
@@ -22,16 +22,38 @@ under the License.
 
     <parent>
         <groupId>org.apache.flink</groupId>
-        <artifactId>flink-agents-integrations</artifactId>
+        <artifactId>flink-agents-integrations-vector-stores</artifactId>
         <version>0.3-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
     </parent>
 
-    <artifactId>flink-agents-integrations-vector-stores</artifactId>
-    <name>Flink Agents : Integrations: Vector Stores</name>
-    <packaging>pom</packaging>
+    <artifactId>flink-agents-integrations-vector-stores-opensearch</artifactId>
+    <name>Flink Agents : Integrations: Vector Stores: OpenSearch</name>
+    <packaging>jar</packaging>
 
-    <modules>
-        <module>elasticsearch</module>
-    </modules>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-agents-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
 
-</project>
\ No newline at end of file
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>apache-client</artifactId>
+            <version>${aws.sdk.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>auth</artifactId>
+            <version>${aws.sdk.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.findbugs</groupId>
+            <artifactId>jsr305</artifactId>
+            <version>1.3.9</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git 
a/integrations/vector-stores/opensearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/opensearch/OpenSearchVectorStore.java
 
b/integrations/vector-stores/opensearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/opensearch/OpenSearchVectorStore.java
new file mode 100644
index 0000000..a6fd2a0
--- /dev/null
+++ 
b/integrations/vector-stores/opensearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/opensearch/OpenSearchVectorStore.java
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.integrations.vectorstores.opensearch;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.agents.api.RetryExecutor;
+import org.apache.flink.agents.api.embedding.model.BaseEmbeddingModelSetup;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.vectorstores.BaseVectorStore;
+import 
org.apache.flink.agents.api.vectorstores.CollectionManageableVectorStore;
+import org.apache.flink.agents.api.vectorstores.Document;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.auth.signer.Aws4Signer;
+import software.amazon.awssdk.auth.signer.params.Aws4SignerParams;
+import software.amazon.awssdk.http.HttpExecuteRequest;
+import software.amazon.awssdk.http.HttpExecuteResponse;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.SdkHttpFullRequest;
+import software.amazon.awssdk.http.SdkHttpMethod;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
+import software.amazon.awssdk.regions.Region;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.function.BiFunction;
+
+/**
+ * OpenSearch vector store supporting both OpenSearch Serverless (AOSS) and 
OpenSearch Service
+ * domains, with IAM (SigV4) or basic auth.
+ *
+ * <p>Implements {@link CollectionManageableVectorStore} for Long-Term Memory 
support. Collections
+ * map to OpenSearch indices. Collection metadata is stored in a dedicated 
{@code
+ * flink_agents_collection_metadata} index.
+ *
+ * <p>Supported parameters:
+ *
+ * <ul>
+ *   <li><b>embedding_model</b> (required): name of the embedding model 
resource
+ *   <li><b>endpoint</b> (required): OpenSearch endpoint URL
+ *   <li><b>index</b> (required): default index name
+ *   <li><b>service_type</b> (optional): "serverless" (default) or "domain"
+ *   <li><b>auth</b> (optional): "iam" (default) or "basic"
+ *   <li><b>username</b> (required if auth=basic): basic auth username
+ *   <li><b>password</b> (required if auth=basic): basic auth password
+ *   <li><b>vector_field</b> (optional): vector field name (default: 
"embedding")
+ *   <li><b>content_field</b> (optional): content field name (default: 
"content")
+ *   <li><b>region</b> (optional): AWS region (default: us-east-1)
+ *   <li><b>dims</b> (optional): vector dimensions for index creation 
(default: 1024)
+ *   <li><b>max_bulk_mb</b> (optional): max bulk payload size in MB (default: 
5)
+ * </ul>
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * @VectorStore
+ * public static ResourceDescriptor opensearchStore() {
+ *     return 
ResourceDescriptor.Builder.newBuilder(OpenSearchVectorStore.class.getName())
+ *             .addInitialArgument("embedding_model", "bedrockEmbeddingSetup")
+ *             .addInitialArgument("endpoint", 
"https://my-domain.us-east-1.es.amazonaws.com";)
+ *             .addInitialArgument("index", "my-vectors")
+ *             .addInitialArgument("service_type", "domain")
+ *             .addInitialArgument("auth", "iam")
+ *             .addInitialArgument("dims", 1024)
+ *             .build();
+ * }
+ * }</pre>
+ */
+public class OpenSearchVectorStore extends BaseVectorStore
+        implements CollectionManageableVectorStore {
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+    private static final String METADATA_INDEX = 
"flink_agents_collection_metadata";
+
+    private static final int DEFAULT_GET_LIMIT = 10000;
+
+    private final String endpoint;
+    private final String index;
+    private final String vectorField;
+    private final String contentField;
+    private final int dims;
+    private final Region region;
+    private final boolean serverless;
+    private final boolean useIamAuth;
+    private final String basicAuthHeader;
+    private final int maxBulkBytes;
+
+    private final SdkHttpClient httpClient;
+    // TODO: Aws4Signer is legacy; migrate to AwsV4HttpSigner from 
http-auth-aws in a follow-up.
+    private final Aws4Signer signer;
+    private final DefaultCredentialsProvider credentialsProvider;
+    private final RetryExecutor retryExecutor;
+
+    public OpenSearchVectorStore(
+            ResourceDescriptor descriptor, BiFunction<String, ResourceType, 
Resource> getResource) {
+        super(descriptor, getResource);
+
+        this.endpoint = descriptor.getArgument("endpoint");
+        if (this.endpoint == null || this.endpoint.isBlank()) {
+            throw new IllegalArgumentException("endpoint is required for 
OpenSearchVectorStore");
+        }
+
+        this.index = descriptor.getArgument("index");
+
+        this.vectorField =
+                
Objects.requireNonNullElse(descriptor.getArgument("vector_field"), "embedding");
+        this.contentField =
+                
Objects.requireNonNullElse(descriptor.getArgument("content_field"), "content");
+        Integer dimsArg = descriptor.getArgument("dims");
+        this.dims = dimsArg != null ? dimsArg : 1024;
+
+        String regionStr = descriptor.getArgument("region");
+        this.region = Region.of(regionStr != null ? regionStr : "us-east-1");
+
+        String serviceType =
+                
Objects.requireNonNullElse(descriptor.getArgument("service_type"), 
"serverless");
+        this.serverless = serviceType.equalsIgnoreCase("serverless");
+
+        String auth = 
Objects.requireNonNullElse(descriptor.getArgument("auth"), "iam");
+        this.useIamAuth = auth.equalsIgnoreCase("iam");
+
+        if (!useIamAuth) {
+            String username = descriptor.getArgument("username");
+            String password = descriptor.getArgument("password");
+            if (username == null || password == null) {
+                throw new IllegalArgumentException("username and password 
required for basic auth");
+            }
+            this.basicAuthHeader =
+                    "Basic "
+                            + Base64.getEncoder()
+                                    .encodeToString(
+                                            (username + ":" + password)
+                                                    
.getBytes(StandardCharsets.UTF_8));
+        } else {
+            this.basicAuthHeader = null;
+        }
+
+        this.httpClient = ApacheHttpClient.create();
+        this.signer = Aws4Signer.create();
+        this.credentialsProvider = 
DefaultCredentialsProvider.builder().build();
+
+        Integer bulkMb = descriptor.getArgument("max_bulk_mb");
+        this.maxBulkBytes = (bulkMb != null ? bulkMb : 5) * 1024 * 1024;
+
+        this.retryExecutor =
+                RetryExecutor.builder()
+                        .maxRetries(5)
+                        .initialBackoffMs(200)
+                        
.retryablePredicate(OpenSearchVectorStore::isRetryableStatus)
+                        .build();
+    }
+
+    @Override
+    public void close() throws Exception {
+        this.httpClient.close();
+        this.credentialsProvider.close();
+    }
+
+    /**
+     * Batch-embeds all documents in a single call, then delegates to 
addEmbedding.
+     *
+     * <p>TODO: This batch embedding logic is duplicated in 
S3VectorsVectorStore. Consider
+     * extracting to BaseVectorStore in a follow-up (would also benefit 
ElasticsearchVectorStore).
+     */
+    @Override
+    public List<String> add(
+            List<Document> documents, @Nullable String collection, Map<String, 
Object> extraArgs)
+            throws IOException {
+        BaseEmbeddingModelSetup emb =
+                (BaseEmbeddingModelSetup)
+                        this.getResource.apply(this.embeddingModel, 
ResourceType.EMBEDDING_MODEL);
+        List<String> texts = new ArrayList<>();
+        List<Integer> needsEmbedding = new ArrayList<>();
+        for (int i = 0; i < documents.size(); i++) {
+            if (documents.get(i).getEmbedding() == null) {
+                texts.add(documents.get(i).getContent());
+                needsEmbedding.add(i);
+            }
+        }
+        if (!texts.isEmpty()) {
+            List<float[]> embeddings = emb.embed(texts);
+            for (int j = 0; j < needsEmbedding.size(); j++) {
+                
documents.get(needsEmbedding.get(j)).setEmbedding(embeddings.get(j));
+            }
+        }
+        return this.addEmbedding(documents, collection, extraArgs);
+    }
+
+    // ---- CollectionManageableVectorStore ----
+
+    @Override
+    public Collection getOrCreateCollection(String name, Map<String, Object> 
metadata)
+            throws Exception {
+        String idx = sanitizeIndexName(name);
+        if (!indexExists(idx)) {
+            createKnnIndex(idx);
+        }
+        ensureMetadataIndex();
+        ObjectNode doc = MAPPER.createObjectNode();
+        doc.put("collection_name", name);
+        doc.set("metadata", MAPPER.valueToTree(metadata));
+        executeRequest("PUT", "/" + METADATA_INDEX + "/_doc/" + idx, 
doc.toString());
+        executeRequest("POST", "/" + METADATA_INDEX + "/_refresh", null);
+        return new Collection(name, metadata != null ? metadata : 
Collections.emptyMap());
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Collection getCollection(String name) throws Exception {
+        String idx = sanitizeIndexName(name);
+        if (!indexExists(idx)) {
+            throw new RuntimeException("Collection " + name + " not found");
+        }
+        try {
+            ensureMetadataIndex();
+            JsonNode resp = executeRequest("GET", "/" + METADATA_INDEX + 
"/_doc/" + idx, null);
+            if (resp.has("found") && resp.get("found").asBoolean()) {
+                Map<String, Object> meta =
+                        
MAPPER.convertValue(resp.path("_source").path("metadata"), Map.class);
+                return new Collection(name, meta != null ? meta : 
Collections.emptyMap());
+            }
+        } catch (RuntimeException e) {
+            // metadata index may not exist yet; only ignore 404s
+            if (!e.getMessage().contains("(404)")) {
+                throw e;
+            }
+        }
+        return new Collection(name, Collections.emptyMap());
+    }
+
+    @Override
+    public Collection deleteCollection(String name) throws Exception {
+        String idx = sanitizeIndexName(name);
+        Collection col = getCollection(name);
+        executeRequest("DELETE", "/" + idx, null);
+        try {
+            executeRequest("DELETE", "/" + METADATA_INDEX + "/_doc/" + idx, 
null);
+        } catch (RuntimeException e) {
+            // metadata doc may not exist; only ignore 404s
+            if (!e.getMessage().contains("(404)")) {
+                throw e;
+            }
+        }
+        return col;
+    }
+
+    private boolean indexExists(String idx) {
+        try {
+            executeRequest("HEAD", "/" + idx, null);
+            return true;
+        } catch (Exception e) {
+            return false;
+        }
+    }
+
+    private void createKnnIndex(String idx) {
+        String body =
+                String.format(
+                        "{\"settings\":{\"index\":{\"knn\":true}},"
+                                + 
"\"mappings\":{\"properties\":{\"%s\":{\"type\":\"knn_vector\","
+                                + 
"\"dimension\":%d},\"%s\":{\"type\":\"text\"},"
+                                + "\"metadata\":{\"type\":\"object\"}}}}",
+                        vectorField, dims, contentField);
+        try {
+            executeRequest("PUT", "/" + idx, body);
+        } catch (RuntimeException e) {
+            if (!e.getMessage().contains("resource_already_exists_exception")) 
{
+                throw e;
+            }
+        }
+    }
+
+    private void ensureMetadataIndex() {
+        if (!indexExists(METADATA_INDEX)) {
+            try {
+                executeRequest(
+                        "PUT",
+                        "/" + METADATA_INDEX,
+                        
"{\"mappings\":{\"properties\":{\"collection_name\":{\"type\":\"keyword\"},"
+                                + "\"metadata\":{\"type\":\"object\"}}}}");
+            } catch (RuntimeException e) {
+                if 
(!e.getMessage().contains("resource_already_exists_exception")) {
+                    throw e;
+                }
+            }
+        }
+    }
+
+    /** Sanitize collection name to valid OpenSearch index name (lowercase, no 
special chars). */
+    private String sanitizeIndexName(String name) {
+        return name.toLowerCase(Locale.ROOT)
+                .replaceAll("[^a-z0-9\\-_]", "-")
+                .replaceAll("^[^a-z]+", "a-");
+    }
+
+    // ---- BaseVectorStore ----
+
+    @Override
+    public Map<String, Object> getStoreKwargs() {
+        Map<String, Object> m = new HashMap<>();
+        m.put("index", index);
+        m.put("vector_field", vectorField);
+        return m;
+    }
+
+    @Override
+    public long size(@Nullable String collection) throws Exception {
+        String idx = collection != null ? sanitizeIndexName(collection) : 
this.index;
+        JsonNode response = executeRequest("GET", "/" + idx + "/_count", null);
+        return response.get("count").asLong();
+    }
+
+    @Override
+    public List<Document> get(
+            @Nullable List<String> ids, @Nullable String collection, 
Map<String, Object> extraArgs)
+            throws IOException {
+        String idx = collection != null ? sanitizeIndexName(collection) : 
this.index;
+        if (ids != null && !ids.isEmpty()) {
+            ObjectNode body = MAPPER.createObjectNode();
+            ArrayNode idsArray = 
body.putObject("query").putObject("ids").putArray("values");
+            ids.forEach(idsArray::add);
+            body.put("size", ids.size());
+            return parseHits(executeRequest("POST", "/" + idx + "/_search", 
body.toString()));
+        }
+        int limit = DEFAULT_GET_LIMIT;
+        if (extraArgs != null && extraArgs.containsKey("limit")) {
+            limit = ((Number) extraArgs.get("limit")).intValue();
+        }
+        return parseHits(
+                executeRequest(
+                        "POST",
+                        "/" + idx + "/_search",
+                        "{\"query\":{\"match_all\":{}},\"size\":" + limit + 
"}"));
+    }
+
+    @Override
+    public void delete(
+            @Nullable List<String> ids, @Nullable String collection, 
Map<String, Object> extraArgs)
+            throws IOException {
+        String idx = collection != null ? sanitizeIndexName(collection) : 
this.index;
+        if (ids != null && !ids.isEmpty()) {
+            ObjectNode body = MAPPER.createObjectNode();
+            ArrayNode idsArray = 
body.putObject("query").putObject("ids").putArray("values");
+            ids.forEach(idsArray::add);
+            executeRequest("POST", "/" + idx + "/_delete_by_query", 
body.toString());
+        } else {
+            executeRequest(
+                    "POST", "/" + idx + "/_delete_by_query", 
"{\"query\":{\"match_all\":{}}}");
+        }
+        executeRequest("POST", "/" + idx + "/_refresh", null);
+    }
+
+    @Override
+    protected List<Document> queryEmbedding(
+            float[] embedding, int limit, @Nullable String collection, 
Map<String, Object> args) {
+        try {
+            String idx = collection != null ? sanitizeIndexName(collection) : 
this.index;
+            int k = (int) args.getOrDefault("k", Math.max(1, limit));
+
+            ObjectNode body = MAPPER.createObjectNode();
+            body.put("size", k);
+            ObjectNode knnQuery = body.putObject("query").putObject("knn");
+            ObjectNode fieldQuery = knnQuery.putObject(vectorField);
+            ArrayNode vectorArray = fieldQuery.putArray("vector");
+            for (float v : embedding) {
+                vectorArray.add(v);
+            }
+            fieldQuery.put("k", k);
+            if (args.containsKey("min_score")) {
+                fieldQuery.put("min_score", ((Number) 
args.get("min_score")).floatValue());
+            }
+            if (args.containsKey("ef_search")) {
+                fieldQuery
+                        .putObject("method_parameters")
+                        .put("ef_search", ((Number) 
args.get("ef_search")).intValue());
+            }
+            if (args.containsKey("filter_query")) {
+                fieldQuery.set("filter", MAPPER.readTree((String) 
args.get("filter_query")));
+            }
+
+            return parseHits(executeRequest("POST", "/" + idx + "/_search", 
body.toString()));
+        } catch (Exception e) {
+            throw new RuntimeException("OpenSearch KNN search failed.", e);
+        }
+    }
+
+    @Override
+    protected List<String> addEmbedding(
+            List<Document> documents, @Nullable String collection, Map<String, 
Object> extraArgs)
+            throws IOException {
+        String idx = collection != null ? sanitizeIndexName(collection) : 
this.index;
+        if (!indexExists(idx)) {
+            createKnnIndex(idx);
+        }
+        List<String> allIds = new ArrayList<>();
+        StringBuilder bulk = new StringBuilder();
+        int bulkBytes = 0;
+
+        for (Document doc : documents) {
+            String id = doc.getId() != null ? doc.getId() : 
UUID.randomUUID().toString();
+            allIds.add(id);
+
+            ObjectNode action = MAPPER.createObjectNode();
+            action.putObject("index").put("_index", idx).put("_id", id);
+            String actionLine = action.toString() + "\n";
+
+            ObjectNode source = MAPPER.createObjectNode();
+            source.put(contentField, doc.getContent());
+            if (doc.getEmbedding() != null) {
+                ArrayNode vec = source.putArray(vectorField);
+                for (float v : doc.getEmbedding()) {
+                    vec.add(v);
+                }
+            }
+            if (doc.getMetadata() != null) {
+                source.set("metadata", MAPPER.valueToTree(doc.getMetadata()));
+            }
+            String sourceLine = source.toString() + "\n";
+
+            int entryBytes = actionLine.length() + sourceLine.length();
+
+            if (bulkBytes > 0 && bulkBytes + entryBytes > maxBulkBytes) {
+                executeRequest("POST", "/_bulk", bulk.toString());
+                bulk.setLength(0);
+                bulkBytes = 0;
+            }
+
+            bulk.append(actionLine).append(sourceLine);
+            bulkBytes += entryBytes;
+        }
+
+        if (bulkBytes > 0) {
+            executeRequest("POST", "/_bulk", bulk.toString());
+        }
+        executeRequest("POST", "/" + idx + "/_refresh", null);
+        return allIds;
+    }
+
+    @SuppressWarnings("unchecked")
+    private List<Document> parseHits(JsonNode response) {
+        List<Document> docs = new ArrayList<>();
+        JsonNode hits = response.path("hits").path("hits");
+        for (JsonNode hit : hits) {
+            String id = hit.get("_id").asText();
+            JsonNode source = hit.get("_source");
+            String content = source.has(contentField) ? 
source.get(contentField).asText() : "";
+            Map<String, Object> metadata = new HashMap<>();
+            if (source.has("metadata")) {
+                metadata = MAPPER.convertValue(source.get("metadata"), 
Map.class);
+            }
+            docs.add(new Document(content, metadata, id));
+        }
+        return docs;
+    }
+
+    private JsonNode executeRequest(String method, String path, @Nullable 
String body) {
+        return retryExecutor.execute(
+                () -> doExecuteRequest(method, path, body), 
"OpenSearchRequest");
+    }
+
+    private static boolean isRetryableStatus(Exception e) {
+        String msg = e.getMessage();
+        return msg != null
+                && (msg.contains("(429)") || msg.contains("(503)") || 
msg.contains("(502)"));
+    }
+
+    private JsonNode doExecuteRequest(String method, String path, @Nullable 
String body) {
+        try {
+            URI uri = URI.create(endpoint + path);
+            SdkHttpFullRequest.Builder reqBuilder =
+                    SdkHttpFullRequest.builder()
+                            .uri(uri)
+                            .method(SdkHttpMethod.valueOf(method))
+                            .putHeader("Content-Type", "application/json");
+
+            if (body != null) {
+                reqBuilder.contentStreamProvider(
+                        () -> new 
ByteArrayInputStream(body.getBytes(StandardCharsets.UTF_8)));
+            }
+
+            SdkHttpFullRequest request;
+            if (useIamAuth) {
+                AwsCredentials credentials = 
credentialsProvider.resolveCredentials();
+                Aws4SignerParams signerParams =
+                        Aws4SignerParams.builder()
+                                .awsCredentials(credentials)
+                                .signingName(serverless ? "aoss" : "es")
+                                .signingRegion(region)
+                                .build();
+                request = signer.sign(reqBuilder.build(), signerParams);
+            } else {
+                request = reqBuilder.putHeader("Authorization", 
basicAuthHeader).build();
+            }
+
+            HttpExecuteRequest.Builder execBuilder = 
HttpExecuteRequest.builder().request(request);
+            if (request.contentStreamProvider().isPresent()) {
+                
execBuilder.contentStreamProvider(request.contentStreamProvider().get());
+            }
+
+            HttpExecuteResponse response = 
httpClient.prepareRequest(execBuilder.build()).call();
+            int statusCode = response.httpResponse().statusCode();
+
+            if ("HEAD".equals(method)) {
+                if (statusCode >= 400) {
+                    throw new RuntimeException(
+                            "OpenSearch HEAD request failed (" + statusCode + 
")");
+                }
+                return MAPPER.createObjectNode().put("status", statusCode);
+            }
+
+            String responseBody = new 
String(response.responseBody().orElseThrow().readAllBytes());
+
+            if (statusCode >= 400) {
+                throw new RuntimeException(
+                        "OpenSearch request failed (" + statusCode + "): " + 
responseBody);
+            }
+            return MAPPER.readTree(responseBody);
+        } catch (RuntimeException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new RuntimeException("OpenSearch request failed.", e);
+        }
+    }
+}
diff --git 
a/integrations/vector-stores/opensearch/src/test/java/org/apache/flink/agents/integrations/vectorstores/opensearch/OpenSearchVectorStoreTest.java
 
b/integrations/vector-stores/opensearch/src/test/java/org/apache/flink/agents/integrations/vectorstores/opensearch/OpenSearchVectorStoreTest.java
new file mode 100644
index 0000000..6e99cc0
--- /dev/null
+++ 
b/integrations/vector-stores/opensearch/src/test/java/org/apache/flink/agents/integrations/vectorstores/opensearch/OpenSearchVectorStoreTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.integrations.vectorstores.opensearch;
+
+import org.apache.flink.agents.api.embedding.model.BaseEmbeddingModelSetup;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.vectorstores.BaseVectorStore;
+import 
org.apache.flink.agents.api.vectorstores.CollectionManageableVectorStore;
+import org.apache.flink.agents.api.vectorstores.Document;
+import org.apache.flink.agents.api.vectorstores.VectorStoreQuery;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for {@link OpenSearchVectorStore}.
+ *
+ * <p>Integration tests require an OpenSearch Serverless collection or domain. 
Set
+ * OPENSEARCH_ENDPOINT environment variable to run.
+ */
+public class OpenSearchVectorStoreTest {
+
+    private static final BiFunction<String, ResourceType, Resource> NOOP = (a, 
b) -> null;
+
+    @Test
+    @DisplayName("Constructor creates store with IAM auth")
+    void testConstructorIamAuth() {
+        ResourceDescriptor desc =
+                
ResourceDescriptor.Builder.newBuilder(OpenSearchVectorStore.class.getName())
+                        .addInitialArgument("embedding_model", "emb")
+                        .addInitialArgument(
+                                "endpoint", 
"https://example.aoss.us-east-1.amazonaws.com";)
+                        .addInitialArgument("index", "test-index")
+                        .addInitialArgument("region", "us-east-1")
+                        .addInitialArgument("service_type", "serverless")
+                        .addInitialArgument("auth", "iam")
+                        .build();
+        OpenSearchVectorStore store = new OpenSearchVectorStore(desc, NOOP);
+        assertThat(store).isInstanceOf(BaseVectorStore.class);
+        assertThat(store).isInstanceOf(CollectionManageableVectorStore.class);
+    }
+
+    @Test
+    @DisplayName("Constructor creates store with basic auth")
+    void testConstructorBasicAuth() {
+        ResourceDescriptor desc =
+                
ResourceDescriptor.Builder.newBuilder(OpenSearchVectorStore.class.getName())
+                        .addInitialArgument("embedding_model", "emb")
+                        .addInitialArgument(
+                                "endpoint", 
"https://my-domain.us-east-1.es.amazonaws.com";)
+                        .addInitialArgument("index", "test-index")
+                        .addInitialArgument("region", "us-east-1")
+                        .addInitialArgument("service_type", "domain")
+                        .addInitialArgument("auth", "basic")
+                        .addInitialArgument("username", "admin")
+                        .addInitialArgument("password", "password")
+                        .build();
+        OpenSearchVectorStore store = new OpenSearchVectorStore(desc, NOOP);
+        assertThat(store).isInstanceOf(BaseVectorStore.class);
+    }
+
+    @Test
+    @DisplayName("Constructor with custom max_bulk_mb")
+    void testConstructorCustomBulkSize() {
+        ResourceDescriptor desc =
+                
ResourceDescriptor.Builder.newBuilder(OpenSearchVectorStore.class.getName())
+                        .addInitialArgument("embedding_model", "emb")
+                        .addInitialArgument(
+                                "endpoint", 
"https://example.aoss.us-east-1.amazonaws.com";)
+                        .addInitialArgument("index", "test-index")
+                        .addInitialArgument("max_bulk_mb", 10)
+                        .build();
+        OpenSearchVectorStore store = new OpenSearchVectorStore(desc, NOOP);
+        assertThat(store.getStoreKwargs()).containsEntry("index", 
"test-index");
+    }
+
+    @Test
+    @DisplayName("Basic auth requires username and password")
+    void testBasicAuthRequiresCredentials() {
+        ResourceDescriptor desc =
+                
ResourceDescriptor.Builder.newBuilder(OpenSearchVectorStore.class.getName())
+                        .addInitialArgument("embedding_model", "emb")
+                        .addInitialArgument("endpoint", "https://example.com";)
+                        .addInitialArgument("index", "test")
+                        .addInitialArgument("auth", "basic")
+                        .build();
+        Assertions.assertThrows(
+                IllegalArgumentException.class, () -> new 
OpenSearchVectorStore(desc, NOOP));
+    }
+
+    // --- Integration tests (require real OpenSearch) ---
+
+    private static BaseVectorStore store;
+
+    private static Resource getResource(String name, ResourceType type) {
+        BaseEmbeddingModelSetup emb = 
Mockito.mock(BaseEmbeddingModelSetup.class);
+        Mockito.when(emb.embed("OpenSearch is a search engine"))
+                .thenReturn(new float[] {0.2f, 0.3f, 0.4f, 0.5f, 0.6f});
+        Mockito.when(emb.embed("Flink Agents is an AI framework"))
+                .thenReturn(new float[] {0.1f, 0.2f, 0.3f, 0.4f, 0.5f});
+        Mockito.when(emb.embed("search engine"))
+                .thenReturn(new float[] {0.2f, 0.3f, 0.4f, 0.5f, 0.6f});
+        Mockito.when(emb.embed(Mockito.anyList()))
+                .thenAnswer(
+                        inv -> {
+                            List<String> texts = inv.getArgument(0);
+                            List<float[]> result = new ArrayList<>();
+                            for (String t : texts) {
+                                result.add(emb.embed(t));
+                            }
+                            return result;
+                        });
+        return emb;
+    }
+
+    @BeforeAll
+    static void initialize() {
+        String endpoint = System.getenv("OPENSEARCH_ENDPOINT");
+        if (endpoint == null) return;
+        String auth = System.getenv().getOrDefault("OPENSEARCH_AUTH", "iam");
+        ResourceDescriptor.Builder builder =
+                
ResourceDescriptor.Builder.newBuilder(OpenSearchVectorStore.class.getName())
+                        .addInitialArgument("embedding_model", "emb")
+                        .addInitialArgument("endpoint", endpoint)
+                        .addInitialArgument("index", "test-opensearch")
+                        .addInitialArgument("dims", 5)
+                        .addInitialArgument(
+                                "region", 
System.getenv().getOrDefault("AWS_REGION", "us-east-1"))
+                        .addInitialArgument(
+                                "service_type",
+                                System.getenv()
+                                        
.getOrDefault("OPENSEARCH_SERVICE_TYPE", "serverless"))
+                        .addInitialArgument("auth", auth);
+        if ("basic".equals(auth)) {
+            builder.addInitialArgument("username", 
System.getenv("OPENSEARCH_USERNAME"));
+            builder.addInitialArgument("password", 
System.getenv("OPENSEARCH_PASSWORD"));
+        }
+        store = new OpenSearchVectorStore(builder.build(), 
OpenSearchVectorStoreTest::getResource);
+    }
+
+    @Test
+    @EnabledIfEnvironmentVariable(named = "OPENSEARCH_ENDPOINT", matches = 
".+")
+    @DisplayName("Collection management: create, get, delete")
+    void testCollectionManagement() throws Exception {
+        CollectionManageableVectorStore vs = (CollectionManageableVectorStore) 
store;
+        String name = "test_collection";
+        Map<String, Object> metadata = Map.of("key1", "value1");
+        vs.getOrCreateCollection(name, metadata);
+
+        CollectionManageableVectorStore.Collection col = 
vs.getCollection(name);
+        Assertions.assertNotNull(col);
+        Assertions.assertEquals(name, col.getName());
+
+        vs.deleteCollection(name);
+    }
+
+    @Test
+    @EnabledIfEnvironmentVariable(named = "OPENSEARCH_ENDPOINT", matches = 
".+")
+    @DisplayName("Document management: add, get, delete")
+    void testDocumentManagement() throws Exception {
+        String name = "test_docs";
+        ((CollectionManageableVectorStore) store).getOrCreateCollection(name, 
Map.of());
+
+        List<Document> docs = new ArrayList<>();
+        docs.add(new Document("OpenSearch is a search engine", Map.of("src", 
"test"), "doc1"));
+        docs.add(new Document("Flink Agents is an AI framework", Map.of("src", 
"test"), "doc2"));
+        store.add(docs, name, Collections.emptyMap());
+        Thread.sleep(1000);
+
+        List<Document> all = store.get(null, name, Collections.emptyMap());
+        Assertions.assertEquals(2, all.size());
+
+        store.delete(Collections.singletonList("doc1"), name, 
Collections.emptyMap());
+        Thread.sleep(1000);
+        List<Document> remaining = store.get(null, name, 
Collections.emptyMap());
+        Assertions.assertEquals(1, remaining.size());
+
+        ((CollectionManageableVectorStore) store).deleteCollection(name);
+    }
+
+    @Test
+    @EnabledIfEnvironmentVariable(named = "OPENSEARCH_ENDPOINT", matches = 
".+")
+    @DisplayName("Query with filter_query restricts results")
+    void testQueryWithFilter() throws Exception {
+        String name = "test_filter";
+        ((CollectionManageableVectorStore) store).getOrCreateCollection(name, 
Map.of());
+
+        List<Document> docs = new ArrayList<>();
+        docs.add(new Document("OpenSearch is a search engine", Map.of("src", 
"web"), "f1"));
+        docs.add(new Document("Flink Agents is an AI framework", Map.of("src", 
"code"), "f2"));
+        store.add(docs, name, Collections.emptyMap());
+        Thread.sleep(1000);
+
+        // Query with filter: only src=web
+        VectorStoreQuery q =
+                new VectorStoreQuery(
+                        "search engine",
+                        5,
+                        name,
+                        Map.of("filter_query", 
"{\"term\":{\"metadata.src.keyword\":\"web\"}}"));
+        List<Document> results = store.query(q).getDocuments();
+        Assertions.assertFalse(results.isEmpty());
+        Assertions.assertTrue(
+                results.stream().allMatch(d -> 
"web".equals(d.getMetadata().get("src"))));
+
+        ((CollectionManageableVectorStore) store).deleteCollection(name);
+    }
+}
diff --git a/integrations/vector-stores/pom.xml 
b/integrations/vector-stores/pom.xml
index 7b9612d..4d4766d 100644
--- a/integrations/vector-stores/pom.xml
+++ b/integrations/vector-stores/pom.xml
@@ -32,6 +32,8 @@ under the License.
 
     <modules>
         <module>elasticsearch</module>
+        <module>opensearch</module>
+        <module>s3vectors</module>
     </modules>
 
 </project>
\ No newline at end of file
diff --git a/integrations/vector-stores/pom.xml 
b/integrations/vector-stores/s3vectors/pom.xml
similarity index 55%
copy from integrations/vector-stores/pom.xml
copy to integrations/vector-stores/s3vectors/pom.xml
index 7b9612d..64fbf87 100644
--- a/integrations/vector-stores/pom.xml
+++ b/integrations/vector-stores/s3vectors/pom.xml
@@ -22,16 +22,33 @@ under the License.
 
     <parent>
         <groupId>org.apache.flink</groupId>
-        <artifactId>flink-agents-integrations</artifactId>
+        <artifactId>flink-agents-integrations-vector-stores</artifactId>
         <version>0.3-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
     </parent>
 
-    <artifactId>flink-agents-integrations-vector-stores</artifactId>
-    <name>Flink Agents : Integrations: Vector Stores</name>
-    <packaging>pom</packaging>
+    <artifactId>flink-agents-integrations-vector-stores-s3vectors</artifactId>
+    <name>Flink Agents : Integrations: Vector Stores: S3 Vectors</name>
+    <packaging>jar</packaging>
 
-    <modules>
-        <module>elasticsearch</module>
-    </modules>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-agents-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
 
-</project>
\ No newline at end of file
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>s3vectors</artifactId>
+            <version>${aws.sdk.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.findbugs</groupId>
+            <artifactId>jsr305</artifactId>
+            <version>1.3.9</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git 
a/integrations/vector-stores/s3vectors/src/main/java/org/apache/flink/agents/integrations/vectorstores/s3vectors/S3VectorsVectorStore.java
 
b/integrations/vector-stores/s3vectors/src/main/java/org/apache/flink/agents/integrations/vectorstores/s3vectors/S3VectorsVectorStore.java
new file mode 100644
index 0000000..833010c
--- /dev/null
+++ 
b/integrations/vector-stores/s3vectors/src/main/java/org/apache/flink/agents/integrations/vectorstores/s3vectors/S3VectorsVectorStore.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.integrations.vectorstores.s3vectors;
+
+import org.apache.flink.agents.api.RetryExecutor;
+import org.apache.flink.agents.api.embedding.model.BaseEmbeddingModelSetup;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.vectorstores.BaseVectorStore;
+import org.apache.flink.agents.api.vectorstores.Document;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3vectors.S3VectorsClient;
+import software.amazon.awssdk.services.s3vectors.model.DeleteVectorsRequest;
+import software.amazon.awssdk.services.s3vectors.model.GetOutputVector;
+import software.amazon.awssdk.services.s3vectors.model.GetVectorsRequest;
+import software.amazon.awssdk.services.s3vectors.model.GetVectorsResponse;
+import software.amazon.awssdk.services.s3vectors.model.PutInputVector;
+import software.amazon.awssdk.services.s3vectors.model.PutVectorsRequest;
+import software.amazon.awssdk.services.s3vectors.model.QueryOutputVector;
+import software.amazon.awssdk.services.s3vectors.model.QueryVectorsRequest;
+import software.amazon.awssdk.services.s3vectors.model.QueryVectorsResponse;
+import software.amazon.awssdk.services.s3vectors.model.VectorData;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.BiFunction;
+
+/**
+ * Amazon S3 Vectors vector store for flink-agents.
+ *
+ * <p>Uses the S3 Vectors SDK for 
PutVectors/QueryVectors/GetVectors/DeleteVectors. PutVectors calls
+ * are chunked at 500 vectors per request (API limit).
+ *
+ * <p>Supported parameters:
+ *
+ * <ul>
+ *   <li><b>embedding_model</b> (required): name of the embedding model 
resource
+ *   <li><b>vector_bucket</b> (required): S3 Vectors bucket name
+ *   <li><b>vector_index</b> (required): S3 Vectors index name
+ *   <li><b>region</b> (optional): AWS region (default: us-east-1)
+ * </ul>
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * @VectorStore
+ * public static ResourceDescriptor s3VectorsStore() {
+ *     return 
ResourceDescriptor.Builder.newBuilder(S3VectorsVectorStore.class.getName())
+ *             .addInitialArgument("embedding_model", "bedrockEmbeddingSetup")
+ *             .addInitialArgument("vector_bucket", "my-vector-bucket")
+ *             .addInitialArgument("vector_index", "my-index")
+ *             .addInitialArgument("region", "us-east-1")
+ *             .build();
+ * }
+ * }</pre>
+ */
+public class S3VectorsVectorStore extends BaseVectorStore {
+
+    private static final int MAX_PUT_VECTORS_BATCH = 500;
+
+    private final S3VectorsClient client;
+    private final String vectorBucket;
+    private final String vectorIndex;
+    private final RetryExecutor retryExecutor;
+
+    public S3VectorsVectorStore(
+            ResourceDescriptor descriptor, BiFunction<String, ResourceType, 
Resource> getResource) {
+        super(descriptor, getResource);
+
+        this.vectorBucket = descriptor.getArgument("vector_bucket");
+        if (this.vectorBucket == null || this.vectorBucket.isBlank()) {
+            throw new IllegalArgumentException(
+                    "vector_bucket is required for S3VectorsVectorStore");
+        }
+
+        this.vectorIndex = descriptor.getArgument("vector_index");
+        if (this.vectorIndex == null || this.vectorIndex.isBlank()) {
+            throw new IllegalArgumentException("vector_index is required for 
S3VectorsVectorStore");
+        }
+
+        String regionStr = descriptor.getArgument("region");
+        this.client =
+                S3VectorsClient.builder()
+                        .region(Region.of(regionStr != null ? regionStr : 
"us-east-1"))
+                        
.credentialsProvider(DefaultCredentialsProvider.builder().build())
+                        .build();
+
+        this.retryExecutor =
+                RetryExecutor.builder()
+                        .maxRetries(5)
+                        .initialBackoffMs(200)
+                        .retryablePredicate(S3VectorsVectorStore::isRetryable)
+                        .build();
+    }
+
+    @Override
+    public void close() throws Exception {
+        this.client.close();
+    }
+
+    /**
+     * Batch-embeds all documents in a single call, then delegates to 
addEmbedding.
+     *
+     * <p>TODO: This batch embedding logic is duplicated in 
OpenSearchVectorStore. Consider
+     * extracting to BaseVectorStore in a follow-up.
+     */
+    @Override
+    public List<String> add(
+            List<Document> documents, @Nullable String collection, Map<String, 
Object> extraArgs)
+            throws IOException {
+        BaseEmbeddingModelSetup emb =
+                (BaseEmbeddingModelSetup)
+                        this.getResource.apply(this.embeddingModel, 
ResourceType.EMBEDDING_MODEL);
+        List<String> texts = new ArrayList<>();
+        List<Integer> needsEmbedding = new ArrayList<>();
+        for (int i = 0; i < documents.size(); i++) {
+            if (documents.get(i).getEmbedding() == null) {
+                texts.add(documents.get(i).getContent());
+                needsEmbedding.add(i);
+            }
+        }
+        if (!texts.isEmpty()) {
+            List<float[]> embeddings = emb.embed(texts);
+            for (int j = 0; j < needsEmbedding.size(); j++) {
+                
documents.get(needsEmbedding.get(j)).setEmbedding(embeddings.get(j));
+            }
+        }
+        return this.addEmbedding(documents, collection, extraArgs);
+    }
+
+    @Override
+    public Map<String, Object> getStoreKwargs() {
+        Map<String, Object> m = new HashMap<>();
+        m.put("vector_bucket", vectorBucket);
+        m.put("vector_index", vectorIndex);
+        return m;
+    }
+
+    /**
+     * S3 Vectors does not support a count operation.
+     *
+     * @throws UnsupportedOperationException always
+     */
+    @Override
+    public long size(@Nullable String collection) {
+        throw new UnsupportedOperationException("S3 Vectors does not support 
count operations");
+    }
+
+    @Override
+    public List<Document> get(
+            @Nullable List<String> ids, @Nullable String collection, 
Map<String, Object> extraArgs)
+            throws IOException {
+        if (ids == null || ids.isEmpty()) {
+            throw new UnsupportedOperationException(
+                    "S3 Vectors get-all is not implemented; explicit ids are 
required.");
+        }
+        String idx = collection != null ? collection : vectorIndex;
+
+        GetVectorsResponse response =
+                client.getVectors(
+                        GetVectorsRequest.builder()
+                                .vectorBucketName(vectorBucket)
+                                .indexName(idx)
+                                .keys(ids)
+                                .returnMetadata(true)
+                                .build());
+
+        List<Document> docs = new ArrayList<>();
+        for (GetOutputVector v : response.vectors()) {
+            docs.add(toDocument(v.key(), v.metadata()));
+        }
+        return docs;
+    }
+
+    @Override
+    public void delete(
+            @Nullable List<String> ids, @Nullable String collection, 
Map<String, Object> extraArgs)
+            throws IOException {
+        if (ids == null || ids.isEmpty()) {
+            throw new UnsupportedOperationException(
+                    "S3 Vectors delete-all is not implemented; explicit ids 
are required.");
+        }
+        String idx = collection != null ? collection : vectorIndex;
+        client.deleteVectors(
+                DeleteVectorsRequest.builder()
+                        .vectorBucketName(vectorBucket)
+                        .indexName(idx)
+                        .keys(ids)
+                        .build());
+    }
+
+    @Override
+    protected List<Document> queryEmbedding(
+            float[] embedding, int limit, @Nullable String collection, 
Map<String, Object> args) {
+        try {
+            String idx = collection != null ? collection : vectorIndex;
+            int topK = (int) args.getOrDefault("top_k", Math.max(1, limit));
+
+            List<Float> queryVector = new ArrayList<>(embedding.length);
+            for (float v : embedding) {
+                queryVector.add(v);
+            }
+
+            QueryVectorsResponse response =
+                    client.queryVectors(
+                            QueryVectorsRequest.builder()
+                                    .vectorBucketName(vectorBucket)
+                                    .indexName(idx)
+                                    
.queryVector(VectorData.fromFloat32(queryVector))
+                                    .topK(topK)
+                                    .returnMetadata(true)
+                                    .build());
+
+            List<Document> docs = new ArrayList<>();
+            for (QueryOutputVector v : response.vectors()) {
+                docs.add(toDocument(v.key(), v.metadata()));
+            }
+            return docs;
+        } catch (Exception e) {
+            throw new RuntimeException("S3 Vectors query failed.", e);
+        }
+    }
+
+    @Override
+    protected List<String> addEmbedding(
+            List<Document> documents, @Nullable String collection, Map<String, 
Object> extraArgs)
+            throws IOException {
+        String idx = collection != null ? collection : vectorIndex;
+        List<String> allKeys = new ArrayList<>();
+
+        List<PutInputVector> allVectors = new ArrayList<>();
+        for (Document doc : documents) {
+            String key = doc.getId() != null ? doc.getId() : 
UUID.randomUUID().toString();
+            allKeys.add(key);
+
+            List<Float> embeddingList = new ArrayList<>();
+            if (doc.getEmbedding() != null) {
+                for (float v : doc.getEmbedding()) {
+                    embeddingList.add(v);
+                }
+            }
+
+            Map<String, software.amazon.awssdk.core.document.Document> metaMap 
=
+                    new LinkedHashMap<>();
+            metaMap.put(
+                    "source_text",
+                    
software.amazon.awssdk.core.document.Document.fromString(doc.getContent()));
+            if (doc.getMetadata() != null) {
+                doc.getMetadata()
+                        .forEach(
+                                (k, v) ->
+                                        metaMap.put(
+                                                k,
+                                                
software.amazon.awssdk.core.document.Document
+                                                        
.fromString(String.valueOf(v))));
+            }
+
+            allVectors.add(
+                    PutInputVector.builder()
+                            .key(key)
+                            .data(VectorData.fromFloat32(embeddingList))
+                            .metadata(
+                                    
software.amazon.awssdk.core.document.Document.fromMap(metaMap))
+                            .build());
+        }
+
+        for (int i = 0; i < allVectors.size(); i += MAX_PUT_VECTORS_BATCH) {
+            List<PutInputVector> batch =
+                    allVectors.subList(i, Math.min(i + MAX_PUT_VECTORS_BATCH, 
allVectors.size()));
+            putVectorsWithRetry(idx, batch);
+        }
+        return allKeys;
+    }
+
+    private void putVectorsWithRetry(String idx, List<PutInputVector> batch) {
+        retryExecutor.execute(
+                () -> {
+                    client.putVectors(
+                            PutVectorsRequest.builder()
+                                    .vectorBucketName(vectorBucket)
+                                    .indexName(idx)
+                                    .vectors(batch)
+                                    .build());
+                    return null;
+                },
+                "S3VectorsPutVectors");
+    }
+
+    private static boolean isRetryable(Exception e) {
+        String msg = e.toString();
+        return msg.contains("ThrottlingException")
+                || msg.contains("ServiceUnavailableException")
+                || msg.contains("429")
+                || msg.contains("503");
+    }
+
+    private Document toDocument(
+            String key, software.amazon.awssdk.core.document.Document 
metadata) {
+        Map<String, Object> metaMap = new HashMap<>();
+        String content = "";
+        if (metadata != null && metadata.isMap()) {
+            metadata.asMap()
+                    .forEach(
+                            (k, v) -> {
+                                if (v.isString()) {
+                                    metaMap.put(k, v.asString());
+                                }
+                            });
+            content = metaMap.getOrDefault("source_text", "").toString();
+        }
+        return new Document(content, metaMap, key);
+    }
+}
diff --git 
a/integrations/vector-stores/s3vectors/src/test/java/org/apache/flink/agents/integrations/vectorstores/s3vectors/S3VectorsVectorStoreTest.java
 
b/integrations/vector-stores/s3vectors/src/test/java/org/apache/flink/agents/integrations/vectorstores/s3vectors/S3VectorsVectorStoreTest.java
new file mode 100644
index 0000000..ae8cd0f
--- /dev/null
+++ 
b/integrations/vector-stores/s3vectors/src/test/java/org/apache/flink/agents/integrations/vectorstores/s3vectors/S3VectorsVectorStoreTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.integrations.vectorstores.s3vectors;
+
+import org.apache.flink.agents.api.embedding.model.BaseEmbeddingModelSetup;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.vectorstores.BaseVectorStore;
+import org.apache.flink.agents.api.vectorstores.Document;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for {@link S3VectorsVectorStore}.
+ *
+ * <p>Integration tests require an S3 Vectors bucket and index. Set S3V_BUCKET 
env var to run.
+ */
+public class S3VectorsVectorStoreTest {
+
+    private static final BiFunction<String, ResourceType, Resource> NOOP = (a, 
b) -> null;
+
+    @Test
+    @DisplayName("Constructor creates store")
+    void testConstructor() {
+        ResourceDescriptor desc =
+                
ResourceDescriptor.Builder.newBuilder(S3VectorsVectorStore.class.getName())
+                        .addInitialArgument("embedding_model", "emb")
+                        .addInitialArgument("vector_bucket", "my-bucket")
+                        .addInitialArgument("vector_index", "my-index")
+                        .addInitialArgument("region", "us-east-1")
+                        .build();
+        S3VectorsVectorStore store = new S3VectorsVectorStore(desc, NOOP);
+        assertThat(store).isInstanceOf(BaseVectorStore.class);
+    }
+
+    @Test
+    @DisplayName("getStoreKwargs returns bucket and index")
+    void testStoreKwargs() {
+        ResourceDescriptor desc =
+                
ResourceDescriptor.Builder.newBuilder(S3VectorsVectorStore.class.getName())
+                        .addInitialArgument("embedding_model", "emb")
+                        .addInitialArgument("vector_bucket", "test-bucket")
+                        .addInitialArgument("vector_index", "test-index")
+                        .build();
+        S3VectorsVectorStore store = new S3VectorsVectorStore(desc, NOOP);
+        Map<String, Object> kwargs = store.getStoreKwargs();
+        assertThat(kwargs).containsEntry("vector_bucket", "test-bucket");
+        assertThat(kwargs).containsEntry("vector_index", "test-index");
+    }
+
+    // --- Integration tests (require real S3 Vectors bucket) ---
+
+    private static BaseVectorStore store;
+
+    private static Resource getResource(String name, ResourceType type) {
+        BaseEmbeddingModelSetup emb = 
Mockito.mock(BaseEmbeddingModelSetup.class);
+        Mockito.when(emb.embed("Test document one"))
+                .thenReturn(new float[] {0.1f, 0.2f, 0.3f, 0.4f, 0.5f});
+        Mockito.when(emb.embed("Test document two"))
+                .thenReturn(new float[] {0.5f, 0.4f, 0.3f, 0.2f, 0.1f});
+        Mockito.when(emb.embed(Mockito.anyList()))
+                .thenAnswer(
+                        inv -> {
+                            List<String> texts = inv.getArgument(0);
+                            List<float[]> result = new ArrayList<>();
+                            for (String t : texts) {
+                                result.add(emb.embed(t));
+                            }
+                            return result;
+                        });
+        return emb;
+    }
+
+    @BeforeAll
+    static void initialize() {
+        String bucket = System.getenv("S3V_BUCKET");
+        if (bucket == null) return;
+        ResourceDescriptor desc =
+                
ResourceDescriptor.Builder.newBuilder(S3VectorsVectorStore.class.getName())
+                        .addInitialArgument("embedding_model", "emb")
+                        .addInitialArgument("vector_bucket", bucket)
+                        .addInitialArgument(
+                                "vector_index",
+                                System.getenv().getOrDefault("S3V_INDEX", 
"test-index"))
+                        .addInitialArgument(
+                                "region", 
System.getenv().getOrDefault("AWS_REGION", "us-east-1"))
+                        .build();
+        store = new S3VectorsVectorStore(desc, 
S3VectorsVectorStoreTest::getResource);
+    }
+
+    @Test
+    @EnabledIfEnvironmentVariable(named = "S3V_BUCKET", matches = ".+")
+    @DisplayName("Document add and get")
+    void testDocumentAddAndGet() throws Exception {
+        List<Document> docs = new ArrayList<>();
+        docs.add(new Document("Test document one", Map.of("src", "test"), 
"s3v-doc1"));
+        docs.add(new Document("Test document two", Map.of("src", "test"), 
"s3v-doc2"));
+        store.add(docs, null, Collections.emptyMap());
+
+        List<Document> retrieved =
+                store.get(List.of("s3v-doc1", "s3v-doc2"), null, 
Collections.emptyMap());
+        Assertions.assertEquals(2, retrieved.size());
+
+        store.delete(List.of("s3v-doc1", "s3v-doc2"), null, 
Collections.emptyMap());
+    }
+}

Reply via email to