This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new 862ed2b [FLINK-AGENTS-524] Add Amazon OpenSearch and S3 Vectors
vector store integrations (#533)
862ed2b is described below
commit 862ed2bf186343c46699905be8682400917f7470
Author: Avichay Marciano <[email protected]>
AuthorDate: Thu Mar 5 10:53:26 2026 +0200
[FLINK-AGENTS-524] Add Amazon OpenSearch and S3 Vectors vector store
integrations (#533)
- OpenSearchVectorStore: supports Serverless (AOSS) and Service domains,
IAM and basic auth, SigV4 signing, bulk indexing, collection management for
long-term memory
- S3VectorsVectorStore: PutVectors/QueryVectors/GetVectors/DeleteVectors,
batched puts (500/request limit)
- Retry via unified RetryExecutor with service-specific retryable predicates
---
dist/pom.xml | 10 +
.../vector-stores/{ => opensearch}/pom.xml | 38 +-
.../opensearch/OpenSearchVectorStore.java | 558 +++++++++++++++++++++
.../opensearch/OpenSearchVectorStoreTest.java | 237 +++++++++
integrations/vector-stores/pom.xml | 2 +
integrations/vector-stores/{ => s3vectors}/pom.xml | 33 +-
.../s3vectors/S3VectorsVectorStore.java | 338 +++++++++++++
.../s3vectors/S3VectorsVectorStoreTest.java | 135 +++++
8 files changed, 1335 insertions(+), 16 deletions(-)
diff --git a/dist/pom.xml b/dist/pom.xml
index b4e479a..16ed7fc 100644
--- a/dist/pom.xml
+++ b/dist/pom.xml
@@ -99,6 +99,16 @@ under the License.
<artifactId>flink-agents-integrations-vector-stores-elasticsearch</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-agents-integrations-vector-stores-opensearch</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-agents-integrations-vector-stores-s3vectors</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-agents-integrations-mcp</artifactId>
diff --git a/integrations/vector-stores/pom.xml
b/integrations/vector-stores/opensearch/pom.xml
similarity index 50%
copy from integrations/vector-stores/pom.xml
copy to integrations/vector-stores/opensearch/pom.xml
index 7b9612d..6c34b88 100644
--- a/integrations/vector-stores/pom.xml
+++ b/integrations/vector-stores/opensearch/pom.xml
@@ -22,16 +22,38 @@ under the License.
<parent>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-agents-integrations</artifactId>
+ <artifactId>flink-agents-integrations-vector-stores</artifactId>
<version>0.3-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
</parent>
- <artifactId>flink-agents-integrations-vector-stores</artifactId>
- <name>Flink Agents : Integrations: Vector Stores</name>
- <packaging>pom</packaging>
+ <artifactId>flink-agents-integrations-vector-stores-opensearch</artifactId>
+ <name>Flink Agents : Integrations: Vector Stores: OpenSearch</name>
+ <packaging>jar</packaging>
- <modules>
- <module>elasticsearch</module>
- </modules>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-agents-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
-</project>
\ No newline at end of file
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>apache-client</artifactId>
+ <version>${aws.sdk.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>auth</artifactId>
+ <version>${aws.sdk.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ <version>1.3.9</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git
a/integrations/vector-stores/opensearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/opensearch/OpenSearchVectorStore.java
b/integrations/vector-stores/opensearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/opensearch/OpenSearchVectorStore.java
new file mode 100644
index 0000000..a6fd2a0
--- /dev/null
+++
b/integrations/vector-stores/opensearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/opensearch/OpenSearchVectorStore.java
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.integrations.vectorstores.opensearch;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.agents.api.RetryExecutor;
+import org.apache.flink.agents.api.embedding.model.BaseEmbeddingModelSetup;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.vectorstores.BaseVectorStore;
+import
org.apache.flink.agents.api.vectorstores.CollectionManageableVectorStore;
+import org.apache.flink.agents.api.vectorstores.Document;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.auth.signer.Aws4Signer;
+import software.amazon.awssdk.auth.signer.params.Aws4SignerParams;
+import software.amazon.awssdk.http.HttpExecuteRequest;
+import software.amazon.awssdk.http.HttpExecuteResponse;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.SdkHttpFullRequest;
+import software.amazon.awssdk.http.SdkHttpMethod;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
+import software.amazon.awssdk.regions.Region;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.function.BiFunction;
+
+/**
+ * OpenSearch vector store supporting both OpenSearch Serverless (AOSS) and
OpenSearch Service
+ * domains, with IAM (SigV4) or basic auth.
+ *
+ * <p>Implements {@link CollectionManageableVectorStore} for Long-Term Memory
support. Collections
+ * map to OpenSearch indices. Collection metadata is stored in a dedicated
{@code
+ * flink_agents_collection_metadata} index.
+ *
+ * <p>Supported parameters:
+ *
+ * <ul>
+ * <li><b>embedding_model</b> (required): name of the embedding model
resource
+ * <li><b>endpoint</b> (required): OpenSearch endpoint URL
+ * <li><b>index</b> (required): default index name
+ * <li><b>service_type</b> (optional): "serverless" (default) or "domain"
+ * <li><b>auth</b> (optional): "iam" (default) or "basic"
+ * <li><b>username</b> (required if auth=basic): basic auth username
+ * <li><b>password</b> (required if auth=basic): basic auth password
+ * <li><b>vector_field</b> (optional): vector field name (default:
"embedding")
+ * <li><b>content_field</b> (optional): content field name (default:
"content")
+ * <li><b>region</b> (optional): AWS region (default: us-east-1)
+ * <li><b>dims</b> (optional): vector dimensions for index creation
(default: 1024)
+ * <li><b>max_bulk_mb</b> (optional): max bulk payload size in MB (default:
5)
+ * </ul>
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * @VectorStore
+ * public static ResourceDescriptor opensearchStore() {
+ * return
ResourceDescriptor.Builder.newBuilder(OpenSearchVectorStore.class.getName())
+ * .addInitialArgument("embedding_model", "bedrockEmbeddingSetup")
+ * .addInitialArgument("endpoint",
"https://my-domain.us-east-1.es.amazonaws.com")
+ * .addInitialArgument("index", "my-vectors")
+ * .addInitialArgument("service_type", "domain")
+ * .addInitialArgument("auth", "iam")
+ * .addInitialArgument("dims", 1024)
+ * .build();
+ * }
+ * }</pre>
+ */
+public class OpenSearchVectorStore extends BaseVectorStore
+ implements CollectionManageableVectorStore {
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+ private static final String METADATA_INDEX =
"flink_agents_collection_metadata";
+
+ private static final int DEFAULT_GET_LIMIT = 10000;
+
+ private final String endpoint;
+ private final String index;
+ private final String vectorField;
+ private final String contentField;
+ private final int dims;
+ private final Region region;
+ private final boolean serverless;
+ private final boolean useIamAuth;
+ private final String basicAuthHeader;
+ private final int maxBulkBytes;
+
+ private final SdkHttpClient httpClient;
+ // TODO: Aws4Signer is legacy; migrate to AwsV4HttpSigner from
http-auth-aws in a follow-up.
+ private final Aws4Signer signer;
+ private final DefaultCredentialsProvider credentialsProvider;
+ private final RetryExecutor retryExecutor;
+
+ public OpenSearchVectorStore(
+ ResourceDescriptor descriptor, BiFunction<String, ResourceType,
Resource> getResource) {
+ super(descriptor, getResource);
+
+ this.endpoint = descriptor.getArgument("endpoint");
+ if (this.endpoint == null || this.endpoint.isBlank()) {
+ throw new IllegalArgumentException("endpoint is required for
OpenSearchVectorStore");
+ }
+
+ this.index = descriptor.getArgument("index");
+
+ this.vectorField =
+
Objects.requireNonNullElse(descriptor.getArgument("vector_field"), "embedding");
+ this.contentField =
+
Objects.requireNonNullElse(descriptor.getArgument("content_field"), "content");
+ Integer dimsArg = descriptor.getArgument("dims");
+ this.dims = dimsArg != null ? dimsArg : 1024;
+
+ String regionStr = descriptor.getArgument("region");
+ this.region = Region.of(regionStr != null ? regionStr : "us-east-1");
+
+ String serviceType =
+
Objects.requireNonNullElse(descriptor.getArgument("service_type"),
"serverless");
+ this.serverless = serviceType.equalsIgnoreCase("serverless");
+
+ String auth =
Objects.requireNonNullElse(descriptor.getArgument("auth"), "iam");
+ this.useIamAuth = auth.equalsIgnoreCase("iam");
+
+ if (!useIamAuth) {
+ String username = descriptor.getArgument("username");
+ String password = descriptor.getArgument("password");
+ if (username == null || password == null) {
+ throw new IllegalArgumentException("username and password
required for basic auth");
+ }
+ this.basicAuthHeader =
+ "Basic "
+ + Base64.getEncoder()
+ .encodeToString(
+ (username + ":" + password)
+
.getBytes(StandardCharsets.UTF_8));
+ } else {
+ this.basicAuthHeader = null;
+ }
+
+ this.httpClient = ApacheHttpClient.create();
+ this.signer = Aws4Signer.create();
+ this.credentialsProvider =
DefaultCredentialsProvider.builder().build();
+
+ Integer bulkMb = descriptor.getArgument("max_bulk_mb");
+ this.maxBulkBytes = (bulkMb != null ? bulkMb : 5) * 1024 * 1024;
+
+ this.retryExecutor =
+ RetryExecutor.builder()
+ .maxRetries(5)
+ .initialBackoffMs(200)
+
.retryablePredicate(OpenSearchVectorStore::isRetryableStatus)
+ .build();
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.httpClient.close();
+ this.credentialsProvider.close();
+ }
+
+ /**
+ * Batch-embeds all documents in a single call, then delegates to
addEmbedding.
+ *
+ * <p>TODO: This batch embedding logic is duplicated in
S3VectorsVectorStore. Consider
+ * extracting to BaseVectorStore in a follow-up (would also benefit
ElasticsearchVectorStore).
+ */
+ @Override
+ public List<String> add(
+ List<Document> documents, @Nullable String collection, Map<String,
Object> extraArgs)
+ throws IOException {
+ BaseEmbeddingModelSetup emb =
+ (BaseEmbeddingModelSetup)
+ this.getResource.apply(this.embeddingModel,
ResourceType.EMBEDDING_MODEL);
+ List<String> texts = new ArrayList<>();
+ List<Integer> needsEmbedding = new ArrayList<>();
+ for (int i = 0; i < documents.size(); i++) {
+ if (documents.get(i).getEmbedding() == null) {
+ texts.add(documents.get(i).getContent());
+ needsEmbedding.add(i);
+ }
+ }
+ if (!texts.isEmpty()) {
+ List<float[]> embeddings = emb.embed(texts);
+ for (int j = 0; j < needsEmbedding.size(); j++) {
+
documents.get(needsEmbedding.get(j)).setEmbedding(embeddings.get(j));
+ }
+ }
+ return this.addEmbedding(documents, collection, extraArgs);
+ }
+
+ // ---- CollectionManageableVectorStore ----
+
+ @Override
+ public Collection getOrCreateCollection(String name, Map<String, Object>
metadata)
+ throws Exception {
+ String idx = sanitizeIndexName(name);
+ if (!indexExists(idx)) {
+ createKnnIndex(idx);
+ }
+ ensureMetadataIndex();
+ ObjectNode doc = MAPPER.createObjectNode();
+ doc.put("collection_name", name);
+ doc.set("metadata", MAPPER.valueToTree(metadata));
+ executeRequest("PUT", "/" + METADATA_INDEX + "/_doc/" + idx,
doc.toString());
+ executeRequest("POST", "/" + METADATA_INDEX + "/_refresh", null);
+ return new Collection(name, metadata != null ? metadata :
Collections.emptyMap());
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Collection getCollection(String name) throws Exception {
+ String idx = sanitizeIndexName(name);
+ if (!indexExists(idx)) {
+ throw new RuntimeException("Collection " + name + " not found");
+ }
+ try {
+ ensureMetadataIndex();
+ JsonNode resp = executeRequest("GET", "/" + METADATA_INDEX +
"/_doc/" + idx, null);
+ if (resp.has("found") && resp.get("found").asBoolean()) {
+ Map<String, Object> meta =
+
MAPPER.convertValue(resp.path("_source").path("metadata"), Map.class);
+ return new Collection(name, meta != null ? meta :
Collections.emptyMap());
+ }
+ } catch (RuntimeException e) {
+ // metadata index may not exist yet; only ignore 404s
+ if (!e.getMessage().contains("(404)")) {
+ throw e;
+ }
+ }
+ return new Collection(name, Collections.emptyMap());
+ }
+
+ @Override
+ public Collection deleteCollection(String name) throws Exception {
+ String idx = sanitizeIndexName(name);
+ Collection col = getCollection(name);
+ executeRequest("DELETE", "/" + idx, null);
+ try {
+ executeRequest("DELETE", "/" + METADATA_INDEX + "/_doc/" + idx,
null);
+ } catch (RuntimeException e) {
+ // metadata doc may not exist; only ignore 404s
+ if (!e.getMessage().contains("(404)")) {
+ throw e;
+ }
+ }
+ return col;
+ }
+
+ private boolean indexExists(String idx) {
+ try {
+ executeRequest("HEAD", "/" + idx, null);
+ return true;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ private void createKnnIndex(String idx) {
+ String body =
+ String.format(
+ "{\"settings\":{\"index\":{\"knn\":true}},"
+ +
"\"mappings\":{\"properties\":{\"%s\":{\"type\":\"knn_vector\","
+ +
"\"dimension\":%d},\"%s\":{\"type\":\"text\"},"
+ + "\"metadata\":{\"type\":\"object\"}}}}",
+ vectorField, dims, contentField);
+ try {
+ executeRequest("PUT", "/" + idx, body);
+ } catch (RuntimeException e) {
+ if (!e.getMessage().contains("resource_already_exists_exception"))
{
+ throw e;
+ }
+ }
+ }
+
+ private void ensureMetadataIndex() {
+ if (!indexExists(METADATA_INDEX)) {
+ try {
+ executeRequest(
+ "PUT",
+ "/" + METADATA_INDEX,
+
"{\"mappings\":{\"properties\":{\"collection_name\":{\"type\":\"keyword\"},"
+ + "\"metadata\":{\"type\":\"object\"}}}}");
+ } catch (RuntimeException e) {
+ if
(!e.getMessage().contains("resource_already_exists_exception")) {
+ throw e;
+ }
+ }
+ }
+ }
+
+ /** Sanitize collection name to valid OpenSearch index name (lowercase, no
special chars). */
+ private String sanitizeIndexName(String name) {
+ return name.toLowerCase(Locale.ROOT)
+ .replaceAll("[^a-z0-9\\-_]", "-")
+ .replaceAll("^[^a-z]+", "a-");
+ }
+
+ // ---- BaseVectorStore ----
+
+ @Override
+ public Map<String, Object> getStoreKwargs() {
+ Map<String, Object> m = new HashMap<>();
+ m.put("index", index);
+ m.put("vector_field", vectorField);
+ return m;
+ }
+
+ @Override
+ public long size(@Nullable String collection) throws Exception {
+ String idx = collection != null ? sanitizeIndexName(collection) :
this.index;
+ JsonNode response = executeRequest("GET", "/" + idx + "/_count", null);
+ return response.get("count").asLong();
+ }
+
+ @Override
+ public List<Document> get(
+ @Nullable List<String> ids, @Nullable String collection,
Map<String, Object> extraArgs)
+ throws IOException {
+ String idx = collection != null ? sanitizeIndexName(collection) :
this.index;
+ if (ids != null && !ids.isEmpty()) {
+ ObjectNode body = MAPPER.createObjectNode();
+ ArrayNode idsArray =
body.putObject("query").putObject("ids").putArray("values");
+ ids.forEach(idsArray::add);
+ body.put("size", ids.size());
+ return parseHits(executeRequest("POST", "/" + idx + "/_search",
body.toString()));
+ }
+ int limit = DEFAULT_GET_LIMIT;
+ if (extraArgs != null && extraArgs.containsKey("limit")) {
+ limit = ((Number) extraArgs.get("limit")).intValue();
+ }
+ return parseHits(
+ executeRequest(
+ "POST",
+ "/" + idx + "/_search",
+ "{\"query\":{\"match_all\":{}},\"size\":" + limit +
"}"));
+ }
+
+ @Override
+ public void delete(
+ @Nullable List<String> ids, @Nullable String collection,
Map<String, Object> extraArgs)
+ throws IOException {
+ String idx = collection != null ? sanitizeIndexName(collection) :
this.index;
+ if (ids != null && !ids.isEmpty()) {
+ ObjectNode body = MAPPER.createObjectNode();
+ ArrayNode idsArray =
body.putObject("query").putObject("ids").putArray("values");
+ ids.forEach(idsArray::add);
+ executeRequest("POST", "/" + idx + "/_delete_by_query",
body.toString());
+ } else {
+ executeRequest(
+ "POST", "/" + idx + "/_delete_by_query",
"{\"query\":{\"match_all\":{}}}");
+ }
+ executeRequest("POST", "/" + idx + "/_refresh", null);
+ }
+
+ @Override
+ protected List<Document> queryEmbedding(
+ float[] embedding, int limit, @Nullable String collection,
Map<String, Object> args) {
+ try {
+ String idx = collection != null ? sanitizeIndexName(collection) :
this.index;
+ int k = (int) args.getOrDefault("k", Math.max(1, limit));
+
+ ObjectNode body = MAPPER.createObjectNode();
+ body.put("size", k);
+ ObjectNode knnQuery = body.putObject("query").putObject("knn");
+ ObjectNode fieldQuery = knnQuery.putObject(vectorField);
+ ArrayNode vectorArray = fieldQuery.putArray("vector");
+ for (float v : embedding) {
+ vectorArray.add(v);
+ }
+ fieldQuery.put("k", k);
+ if (args.containsKey("min_score")) {
+ fieldQuery.put("min_score", ((Number)
args.get("min_score")).floatValue());
+ }
+ if (args.containsKey("ef_search")) {
+ fieldQuery
+ .putObject("method_parameters")
+ .put("ef_search", ((Number)
args.get("ef_search")).intValue());
+ }
+ if (args.containsKey("filter_query")) {
+ fieldQuery.set("filter", MAPPER.readTree((String)
args.get("filter_query")));
+ }
+
+ return parseHits(executeRequest("POST", "/" + idx + "/_search",
body.toString()));
+ } catch (Exception e) {
+ throw new RuntimeException("OpenSearch KNN search failed.", e);
+ }
+ }
+
+ @Override
+ protected List<String> addEmbedding(
+ List<Document> documents, @Nullable String collection, Map<String,
Object> extraArgs)
+ throws IOException {
+ String idx = collection != null ? sanitizeIndexName(collection) :
this.index;
+ if (!indexExists(idx)) {
+ createKnnIndex(idx);
+ }
+ List<String> allIds = new ArrayList<>();
+ StringBuilder bulk = new StringBuilder();
+ int bulkBytes = 0;
+
+ for (Document doc : documents) {
+ String id = doc.getId() != null ? doc.getId() :
UUID.randomUUID().toString();
+ allIds.add(id);
+
+ ObjectNode action = MAPPER.createObjectNode();
+ action.putObject("index").put("_index", idx).put("_id", id);
+ String actionLine = action.toString() + "\n";
+
+ ObjectNode source = MAPPER.createObjectNode();
+ source.put(contentField, doc.getContent());
+ if (doc.getEmbedding() != null) {
+ ArrayNode vec = source.putArray(vectorField);
+ for (float v : doc.getEmbedding()) {
+ vec.add(v);
+ }
+ }
+ if (doc.getMetadata() != null) {
+ source.set("metadata", MAPPER.valueToTree(doc.getMetadata()));
+ }
+ String sourceLine = source.toString() + "\n";
+
+ int entryBytes = actionLine.length() + sourceLine.length();
+
+ if (bulkBytes > 0 && bulkBytes + entryBytes > maxBulkBytes) {
+ executeRequest("POST", "/_bulk", bulk.toString());
+ bulk.setLength(0);
+ bulkBytes = 0;
+ }
+
+ bulk.append(actionLine).append(sourceLine);
+ bulkBytes += entryBytes;
+ }
+
+ if (bulkBytes > 0) {
+ executeRequest("POST", "/_bulk", bulk.toString());
+ }
+ executeRequest("POST", "/" + idx + "/_refresh", null);
+ return allIds;
+ }
+
+ @SuppressWarnings("unchecked")
+ private List<Document> parseHits(JsonNode response) {
+ List<Document> docs = new ArrayList<>();
+ JsonNode hits = response.path("hits").path("hits");
+ for (JsonNode hit : hits) {
+ String id = hit.get("_id").asText();
+ JsonNode source = hit.get("_source");
+ String content = source.has(contentField) ?
source.get(contentField).asText() : "";
+ Map<String, Object> metadata = new HashMap<>();
+ if (source.has("metadata")) {
+ metadata = MAPPER.convertValue(source.get("metadata"),
Map.class);
+ }
+ docs.add(new Document(content, metadata, id));
+ }
+ return docs;
+ }
+
+ private JsonNode executeRequest(String method, String path, @Nullable
String body) {
+ return retryExecutor.execute(
+ () -> doExecuteRequest(method, path, body),
"OpenSearchRequest");
+ }
+
+ private static boolean isRetryableStatus(Exception e) {
+ String msg = e.getMessage();
+ return msg != null
+ && (msg.contains("(429)") || msg.contains("(503)") ||
msg.contains("(502)"));
+ }
+
+ private JsonNode doExecuteRequest(String method, String path, @Nullable
String body) {
+ try {
+ URI uri = URI.create(endpoint + path);
+ SdkHttpFullRequest.Builder reqBuilder =
+ SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.valueOf(method))
+ .putHeader("Content-Type", "application/json");
+
+ if (body != null) {
+ reqBuilder.contentStreamProvider(
+ () -> new
ByteArrayInputStream(body.getBytes(StandardCharsets.UTF_8)));
+ }
+
+ SdkHttpFullRequest request;
+ if (useIamAuth) {
+ AwsCredentials credentials =
credentialsProvider.resolveCredentials();
+ Aws4SignerParams signerParams =
+ Aws4SignerParams.builder()
+ .awsCredentials(credentials)
+ .signingName(serverless ? "aoss" : "es")
+ .signingRegion(region)
+ .build();
+ request = signer.sign(reqBuilder.build(), signerParams);
+ } else {
+ request = reqBuilder.putHeader("Authorization",
basicAuthHeader).build();
+ }
+
+ HttpExecuteRequest.Builder execBuilder =
HttpExecuteRequest.builder().request(request);
+ if (request.contentStreamProvider().isPresent()) {
+
execBuilder.contentStreamProvider(request.contentStreamProvider().get());
+ }
+
+ HttpExecuteResponse response =
httpClient.prepareRequest(execBuilder.build()).call();
+ int statusCode = response.httpResponse().statusCode();
+
+ if ("HEAD".equals(method)) {
+ if (statusCode >= 400) {
+ throw new RuntimeException(
+ "OpenSearch HEAD request failed (" + statusCode +
")");
+ }
+ return MAPPER.createObjectNode().put("status", statusCode);
+ }
+
+ String responseBody = new
String(response.responseBody().orElseThrow().readAllBytes());
+
+ if (statusCode >= 400) {
+ throw new RuntimeException(
+ "OpenSearch request failed (" + statusCode + "): " +
responseBody);
+ }
+ return MAPPER.readTree(responseBody);
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException("OpenSearch request failed.", e);
+ }
+ }
+}
diff --git
a/integrations/vector-stores/opensearch/src/test/java/org/apache/flink/agents/integrations/vectorstores/opensearch/OpenSearchVectorStoreTest.java
b/integrations/vector-stores/opensearch/src/test/java/org/apache/flink/agents/integrations/vectorstores/opensearch/OpenSearchVectorStoreTest.java
new file mode 100644
index 0000000..6e99cc0
--- /dev/null
+++
b/integrations/vector-stores/opensearch/src/test/java/org/apache/flink/agents/integrations/vectorstores/opensearch/OpenSearchVectorStoreTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.integrations.vectorstores.opensearch;
+
+import org.apache.flink.agents.api.embedding.model.BaseEmbeddingModelSetup;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.vectorstores.BaseVectorStore;
+import
org.apache.flink.agents.api.vectorstores.CollectionManageableVectorStore;
+import org.apache.flink.agents.api.vectorstores.Document;
+import org.apache.flink.agents.api.vectorstores.VectorStoreQuery;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for {@link OpenSearchVectorStore}.
+ *
+ * <p>Integration tests require an OpenSearch Serverless collection or domain.
Set
+ * OPENSEARCH_ENDPOINT environment variable to run.
+ */
+public class OpenSearchVectorStoreTest {
+
+ private static final BiFunction<String, ResourceType, Resource> NOOP = (a,
b) -> null;
+
+ @Test
+ @DisplayName("Constructor creates store with IAM auth")
+ void testConstructorIamAuth() {
+ ResourceDescriptor desc =
+
ResourceDescriptor.Builder.newBuilder(OpenSearchVectorStore.class.getName())
+ .addInitialArgument("embedding_model", "emb")
+ .addInitialArgument(
+ "endpoint",
"https://example.aoss.us-east-1.amazonaws.com")
+ .addInitialArgument("index", "test-index")
+ .addInitialArgument("region", "us-east-1")
+ .addInitialArgument("service_type", "serverless")
+ .addInitialArgument("auth", "iam")
+ .build();
+ OpenSearchVectorStore store = new OpenSearchVectorStore(desc, NOOP);
+ assertThat(store).isInstanceOf(BaseVectorStore.class);
+ assertThat(store).isInstanceOf(CollectionManageableVectorStore.class);
+ }
+
+ @Test
+ @DisplayName("Constructor creates store with basic auth")
+ void testConstructorBasicAuth() {
+ ResourceDescriptor desc =
+
ResourceDescriptor.Builder.newBuilder(OpenSearchVectorStore.class.getName())
+ .addInitialArgument("embedding_model", "emb")
+ .addInitialArgument(
+ "endpoint",
"https://my-domain.us-east-1.es.amazonaws.com")
+ .addInitialArgument("index", "test-index")
+ .addInitialArgument("region", "us-east-1")
+ .addInitialArgument("service_type", "domain")
+ .addInitialArgument("auth", "basic")
+ .addInitialArgument("username", "admin")
+ .addInitialArgument("password", "password")
+ .build();
+ OpenSearchVectorStore store = new OpenSearchVectorStore(desc, NOOP);
+ assertThat(store).isInstanceOf(BaseVectorStore.class);
+ }
+
+ @Test
+ @DisplayName("Constructor with custom max_bulk_mb")
+ void testConstructorCustomBulkSize() {
+ ResourceDescriptor desc =
+
ResourceDescriptor.Builder.newBuilder(OpenSearchVectorStore.class.getName())
+ .addInitialArgument("embedding_model", "emb")
+ .addInitialArgument(
+ "endpoint",
"https://example.aoss.us-east-1.amazonaws.com")
+ .addInitialArgument("index", "test-index")
+ .addInitialArgument("max_bulk_mb", 10)
+ .build();
+ OpenSearchVectorStore store = new OpenSearchVectorStore(desc, NOOP);
+ assertThat(store.getStoreKwargs()).containsEntry("index",
"test-index");
+ }
+
+ @Test
+ @DisplayName("Basic auth requires username and password")
+ void testBasicAuthRequiresCredentials() {
+ ResourceDescriptor desc =
+
ResourceDescriptor.Builder.newBuilder(OpenSearchVectorStore.class.getName())
+ .addInitialArgument("embedding_model", "emb")
+ .addInitialArgument("endpoint", "https://example.com")
+ .addInitialArgument("index", "test")
+ .addInitialArgument("auth", "basic")
+ .build();
+ Assertions.assertThrows(
+ IllegalArgumentException.class, () -> new
OpenSearchVectorStore(desc, NOOP));
+ }
+
+ // --- Integration tests (require real OpenSearch) ---
+
+ private static BaseVectorStore store;
+
+ private static Resource getResource(String name, ResourceType type) {
+ BaseEmbeddingModelSetup emb =
Mockito.mock(BaseEmbeddingModelSetup.class);
+ Mockito.when(emb.embed("OpenSearch is a search engine"))
+ .thenReturn(new float[] {0.2f, 0.3f, 0.4f, 0.5f, 0.6f});
+ Mockito.when(emb.embed("Flink Agents is an AI framework"))
+ .thenReturn(new float[] {0.1f, 0.2f, 0.3f, 0.4f, 0.5f});
+ Mockito.when(emb.embed("search engine"))
+ .thenReturn(new float[] {0.2f, 0.3f, 0.4f, 0.5f, 0.6f});
+ Mockito.when(emb.embed(Mockito.anyList()))
+ .thenAnswer(
+ inv -> {
+ List<String> texts = inv.getArgument(0);
+ List<float[]> result = new ArrayList<>();
+ for (String t : texts) {
+ result.add(emb.embed(t));
+ }
+ return result;
+ });
+ return emb;
+ }
+
+ @BeforeAll
+ static void initialize() {
+ String endpoint = System.getenv("OPENSEARCH_ENDPOINT");
+ if (endpoint == null) return;
+ String auth = System.getenv().getOrDefault("OPENSEARCH_AUTH", "iam");
+ ResourceDescriptor.Builder builder =
+
ResourceDescriptor.Builder.newBuilder(OpenSearchVectorStore.class.getName())
+ .addInitialArgument("embedding_model", "emb")
+ .addInitialArgument("endpoint", endpoint)
+ .addInitialArgument("index", "test-opensearch")
+ .addInitialArgument("dims", 5)
+ .addInitialArgument(
+ "region",
System.getenv().getOrDefault("AWS_REGION", "us-east-1"))
+ .addInitialArgument(
+ "service_type",
+ System.getenv()
+
.getOrDefault("OPENSEARCH_SERVICE_TYPE", "serverless"))
+ .addInitialArgument("auth", auth);
+ if ("basic".equals(auth)) {
+ builder.addInitialArgument("username",
System.getenv("OPENSEARCH_USERNAME"));
+ builder.addInitialArgument("password",
System.getenv("OPENSEARCH_PASSWORD"));
+ }
+ store = new OpenSearchVectorStore(builder.build(),
OpenSearchVectorStoreTest::getResource);
+ }
+
+ @Test
+ @EnabledIfEnvironmentVariable(named = "OPENSEARCH_ENDPOINT", matches =
".+")
+ @DisplayName("Collection management: create, get, delete")
+ void testCollectionManagement() throws Exception {
+ CollectionManageableVectorStore vs = (CollectionManageableVectorStore)
store;
+ String name = "test_collection";
+ Map<String, Object> metadata = Map.of("key1", "value1");
+ vs.getOrCreateCollection(name, metadata);
+
+ CollectionManageableVectorStore.Collection col =
vs.getCollection(name);
+ Assertions.assertNotNull(col);
+ Assertions.assertEquals(name, col.getName());
+
+ vs.deleteCollection(name);
+ }
+
+ @Test
+ @EnabledIfEnvironmentVariable(named = "OPENSEARCH_ENDPOINT", matches =
".+")
+ @DisplayName("Document management: add, get, delete")
+ void testDocumentManagement() throws Exception {
+ String name = "test_docs";
+ ((CollectionManageableVectorStore) store).getOrCreateCollection(name,
Map.of());
+
+ List<Document> docs = new ArrayList<>();
+ docs.add(new Document("OpenSearch is a search engine", Map.of("src",
"test"), "doc1"));
+ docs.add(new Document("Flink Agents is an AI framework", Map.of("src",
"test"), "doc2"));
+ store.add(docs, name, Collections.emptyMap());
+ Thread.sleep(1000);
+
+ List<Document> all = store.get(null, name, Collections.emptyMap());
+ Assertions.assertEquals(2, all.size());
+
+ store.delete(Collections.singletonList("doc1"), name,
Collections.emptyMap());
+ Thread.sleep(1000);
+ List<Document> remaining = store.get(null, name,
Collections.emptyMap());
+ Assertions.assertEquals(1, remaining.size());
+
+ ((CollectionManageableVectorStore) store).deleteCollection(name);
+ }
+
+ @Test
+ @EnabledIfEnvironmentVariable(named = "OPENSEARCH_ENDPOINT", matches =
".+")
+ @DisplayName("Query with filter_query restricts results")
+ void testQueryWithFilter() throws Exception {
+ String name = "test_filter";
+ ((CollectionManageableVectorStore) store).getOrCreateCollection(name,
Map.of());
+
+ List<Document> docs = new ArrayList<>();
+ docs.add(new Document("OpenSearch is a search engine", Map.of("src",
"web"), "f1"));
+ docs.add(new Document("Flink Agents is an AI framework", Map.of("src",
"code"), "f2"));
+ store.add(docs, name, Collections.emptyMap());
+ Thread.sleep(1000);
+
+ // Query with filter: only src=web
+ VectorStoreQuery q =
+ new VectorStoreQuery(
+ "search engine",
+ 5,
+ name,
+ Map.of("filter_query",
"{\"term\":{\"metadata.src.keyword\":\"web\"}}"));
+ List<Document> results = store.query(q).getDocuments();
+ Assertions.assertFalse(results.isEmpty());
+ Assertions.assertTrue(
+ results.stream().allMatch(d ->
"web".equals(d.getMetadata().get("src"))));
+
+ ((CollectionManageableVectorStore) store).deleteCollection(name);
+ }
+}
diff --git a/integrations/vector-stores/pom.xml
b/integrations/vector-stores/pom.xml
index 7b9612d..4d4766d 100644
--- a/integrations/vector-stores/pom.xml
+++ b/integrations/vector-stores/pom.xml
@@ -32,6 +32,8 @@ under the License.
<modules>
<module>elasticsearch</module>
+ <module>opensearch</module>
+ <module>s3vectors</module>
</modules>
</project>
\ No newline at end of file
diff --git a/integrations/vector-stores/pom.xml
b/integrations/vector-stores/s3vectors/pom.xml
similarity index 55%
copy from integrations/vector-stores/pom.xml
copy to integrations/vector-stores/s3vectors/pom.xml
index 7b9612d..64fbf87 100644
--- a/integrations/vector-stores/pom.xml
+++ b/integrations/vector-stores/s3vectors/pom.xml
@@ -22,16 +22,33 @@ under the License.
<parent>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-agents-integrations</artifactId>
+ <artifactId>flink-agents-integrations-vector-stores</artifactId>
<version>0.3-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
</parent>
- <artifactId>flink-agents-integrations-vector-stores</artifactId>
- <name>Flink Agents : Integrations: Vector Stores</name>
- <packaging>pom</packaging>
+ <artifactId>flink-agents-integrations-vector-stores-s3vectors</artifactId>
+ <name>Flink Agents : Integrations: Vector Stores: S3 Vectors</name>
+ <packaging>jar</packaging>
- <modules>
- <module>elasticsearch</module>
- </modules>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-agents-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
-</project>
\ No newline at end of file
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>s3vectors</artifactId>
+ <version>${aws.sdk.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ <version>1.3.9</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git
a/integrations/vector-stores/s3vectors/src/main/java/org/apache/flink/agents/integrations/vectorstores/s3vectors/S3VectorsVectorStore.java
b/integrations/vector-stores/s3vectors/src/main/java/org/apache/flink/agents/integrations/vectorstores/s3vectors/S3VectorsVectorStore.java
new file mode 100644
index 0000000..833010c
--- /dev/null
+++
b/integrations/vector-stores/s3vectors/src/main/java/org/apache/flink/agents/integrations/vectorstores/s3vectors/S3VectorsVectorStore.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.integrations.vectorstores.s3vectors;
+
+import org.apache.flink.agents.api.RetryExecutor;
+import org.apache.flink.agents.api.embedding.model.BaseEmbeddingModelSetup;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.vectorstores.BaseVectorStore;
+import org.apache.flink.agents.api.vectorstores.Document;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3vectors.S3VectorsClient;
+import software.amazon.awssdk.services.s3vectors.model.DeleteVectorsRequest;
+import software.amazon.awssdk.services.s3vectors.model.GetOutputVector;
+import software.amazon.awssdk.services.s3vectors.model.GetVectorsRequest;
+import software.amazon.awssdk.services.s3vectors.model.GetVectorsResponse;
+import software.amazon.awssdk.services.s3vectors.model.PutInputVector;
+import software.amazon.awssdk.services.s3vectors.model.PutVectorsRequest;
+import software.amazon.awssdk.services.s3vectors.model.QueryOutputVector;
+import software.amazon.awssdk.services.s3vectors.model.QueryVectorsRequest;
+import software.amazon.awssdk.services.s3vectors.model.QueryVectorsResponse;
+import software.amazon.awssdk.services.s3vectors.model.VectorData;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.BiFunction;
+
+/**
+ * Amazon S3 Vectors vector store for flink-agents.
+ *
+ * <p>Uses the S3 Vectors SDK for
PutVectors/QueryVectors/GetVectors/DeleteVectors. PutVectors calls
+ * are chunked at 500 vectors per request (API limit).
+ *
+ * <p>Supported parameters:
+ *
+ * <ul>
+ * <li><b>embedding_model</b> (required): name of the embedding model
resource
+ * <li><b>vector_bucket</b> (required): S3 Vectors bucket name
+ * <li><b>vector_index</b> (required): S3 Vectors index name
+ * <li><b>region</b> (optional): AWS region (default: us-east-1)
+ * </ul>
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * @VectorStore
+ * public static ResourceDescriptor s3VectorsStore() {
+ * return
ResourceDescriptor.Builder.newBuilder(S3VectorsVectorStore.class.getName())
+ * .addInitialArgument("embedding_model", "bedrockEmbeddingSetup")
+ * .addInitialArgument("vector_bucket", "my-vector-bucket")
+ * .addInitialArgument("vector_index", "my-index")
+ * .addInitialArgument("region", "us-east-1")
+ * .build();
+ * }
+ * }</pre>
+ */
+public class S3VectorsVectorStore extends BaseVectorStore {
+
+ private static final int MAX_PUT_VECTORS_BATCH = 500;
+
+ private final S3VectorsClient client;
+ private final String vectorBucket;
+ private final String vectorIndex;
+ private final RetryExecutor retryExecutor;
+
+ public S3VectorsVectorStore(
+ ResourceDescriptor descriptor, BiFunction<String, ResourceType,
Resource> getResource) {
+ super(descriptor, getResource);
+
+ this.vectorBucket = descriptor.getArgument("vector_bucket");
+ if (this.vectorBucket == null || this.vectorBucket.isBlank()) {
+ throw new IllegalArgumentException(
+ "vector_bucket is required for S3VectorsVectorStore");
+ }
+
+ this.vectorIndex = descriptor.getArgument("vector_index");
+ if (this.vectorIndex == null || this.vectorIndex.isBlank()) {
+ throw new IllegalArgumentException("vector_index is required for
S3VectorsVectorStore");
+ }
+
+ String regionStr = descriptor.getArgument("region");
+ this.client =
+ S3VectorsClient.builder()
+ .region(Region.of(regionStr != null ? regionStr :
"us-east-1"))
+
.credentialsProvider(DefaultCredentialsProvider.builder().build())
+ .build();
+
+ this.retryExecutor =
+ RetryExecutor.builder()
+ .maxRetries(5)
+ .initialBackoffMs(200)
+ .retryablePredicate(S3VectorsVectorStore::isRetryable)
+ .build();
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.client.close();
+ }
+
+ /**
+ * Batch-embeds all documents in a single call, then delegates to
addEmbedding.
+ *
+ * <p>TODO: This batch embedding logic is duplicated in
OpenSearchVectorStore. Consider
+ * extracting to BaseVectorStore in a follow-up.
+ */
+ @Override
+ public List<String> add(
+ List<Document> documents, @Nullable String collection, Map<String,
Object> extraArgs)
+ throws IOException {
+ BaseEmbeddingModelSetup emb =
+ (BaseEmbeddingModelSetup)
+ this.getResource.apply(this.embeddingModel,
ResourceType.EMBEDDING_MODEL);
+ List<String> texts = new ArrayList<>();
+ List<Integer> needsEmbedding = new ArrayList<>();
+ for (int i = 0; i < documents.size(); i++) {
+ if (documents.get(i).getEmbedding() == null) {
+ texts.add(documents.get(i).getContent());
+ needsEmbedding.add(i);
+ }
+ }
+ if (!texts.isEmpty()) {
+ List<float[]> embeddings = emb.embed(texts);
+ for (int j = 0; j < needsEmbedding.size(); j++) {
+
documents.get(needsEmbedding.get(j)).setEmbedding(embeddings.get(j));
+ }
+ }
+ return this.addEmbedding(documents, collection, extraArgs);
+ }
+
+ @Override
+ public Map<String, Object> getStoreKwargs() {
+ Map<String, Object> m = new HashMap<>();
+ m.put("vector_bucket", vectorBucket);
+ m.put("vector_index", vectorIndex);
+ return m;
+ }
+
+ /**
+ * S3 Vectors does not support a count operation.
+ *
+ * @throws UnsupportedOperationException always
+ */
+ @Override
+ public long size(@Nullable String collection) {
+ throw new UnsupportedOperationException("S3 Vectors does not support
count operations");
+ }
+
+ @Override
+ public List<Document> get(
+ @Nullable List<String> ids, @Nullable String collection,
Map<String, Object> extraArgs)
+ throws IOException {
+ if (ids == null || ids.isEmpty()) {
+ throw new UnsupportedOperationException(
+ "S3 Vectors get-all is not implemented; explicit ids are
required.");
+ }
+ String idx = collection != null ? collection : vectorIndex;
+
+ GetVectorsResponse response =
+ client.getVectors(
+ GetVectorsRequest.builder()
+ .vectorBucketName(vectorBucket)
+ .indexName(idx)
+ .keys(ids)
+ .returnMetadata(true)
+ .build());
+
+ List<Document> docs = new ArrayList<>();
+ for (GetOutputVector v : response.vectors()) {
+ docs.add(toDocument(v.key(), v.metadata()));
+ }
+ return docs;
+ }
+
+ @Override
+ public void delete(
+ @Nullable List<String> ids, @Nullable String collection,
Map<String, Object> extraArgs)
+ throws IOException {
+ if (ids == null || ids.isEmpty()) {
+ throw new UnsupportedOperationException(
+ "S3 Vectors delete-all is not implemented; explicit ids
are required.");
+ }
+ String idx = collection != null ? collection : vectorIndex;
+ client.deleteVectors(
+ DeleteVectorsRequest.builder()
+ .vectorBucketName(vectorBucket)
+ .indexName(idx)
+ .keys(ids)
+ .build());
+ }
+
+ @Override
+ protected List<Document> queryEmbedding(
+ float[] embedding, int limit, @Nullable String collection,
Map<String, Object> args) {
+ try {
+ String idx = collection != null ? collection : vectorIndex;
+ int topK = (int) args.getOrDefault("top_k", Math.max(1, limit));
+
+ List<Float> queryVector = new ArrayList<>(embedding.length);
+ for (float v : embedding) {
+ queryVector.add(v);
+ }
+
+ QueryVectorsResponse response =
+ client.queryVectors(
+ QueryVectorsRequest.builder()
+ .vectorBucketName(vectorBucket)
+ .indexName(idx)
+
.queryVector(VectorData.fromFloat32(queryVector))
+ .topK(topK)
+ .returnMetadata(true)
+ .build());
+
+ List<Document> docs = new ArrayList<>();
+ for (QueryOutputVector v : response.vectors()) {
+ docs.add(toDocument(v.key(), v.metadata()));
+ }
+ return docs;
+ } catch (Exception e) {
+ throw new RuntimeException("S3 Vectors query failed.", e);
+ }
+ }
+
+ @Override
+ protected List<String> addEmbedding(
+ List<Document> documents, @Nullable String collection, Map<String,
Object> extraArgs)
+ throws IOException {
+ String idx = collection != null ? collection : vectorIndex;
+ List<String> allKeys = new ArrayList<>();
+
+ List<PutInputVector> allVectors = new ArrayList<>();
+ for (Document doc : documents) {
+ String key = doc.getId() != null ? doc.getId() :
UUID.randomUUID().toString();
+ allKeys.add(key);
+
+ List<Float> embeddingList = new ArrayList<>();
+ if (doc.getEmbedding() != null) {
+ for (float v : doc.getEmbedding()) {
+ embeddingList.add(v);
+ }
+ }
+
+ Map<String, software.amazon.awssdk.core.document.Document> metaMap
=
+ new LinkedHashMap<>();
+ metaMap.put(
+ "source_text",
+
software.amazon.awssdk.core.document.Document.fromString(doc.getContent()));
+ if (doc.getMetadata() != null) {
+ doc.getMetadata()
+ .forEach(
+ (k, v) ->
+ metaMap.put(
+ k,
+
software.amazon.awssdk.core.document.Document
+
.fromString(String.valueOf(v))));
+ }
+
+ allVectors.add(
+ PutInputVector.builder()
+ .key(key)
+ .data(VectorData.fromFloat32(embeddingList))
+ .metadata(
+
software.amazon.awssdk.core.document.Document.fromMap(metaMap))
+ .build());
+ }
+
+ for (int i = 0; i < allVectors.size(); i += MAX_PUT_VECTORS_BATCH) {
+ List<PutInputVector> batch =
+ allVectors.subList(i, Math.min(i + MAX_PUT_VECTORS_BATCH,
allVectors.size()));
+ putVectorsWithRetry(idx, batch);
+ }
+ return allKeys;
+ }
+
+ private void putVectorsWithRetry(String idx, List<PutInputVector> batch) {
+ retryExecutor.execute(
+ () -> {
+ client.putVectors(
+ PutVectorsRequest.builder()
+ .vectorBucketName(vectorBucket)
+ .indexName(idx)
+ .vectors(batch)
+ .build());
+ return null;
+ },
+ "S3VectorsPutVectors");
+ }
+
+ private static boolean isRetryable(Exception e) {
+ String msg = e.toString();
+ return msg.contains("ThrottlingException")
+ || msg.contains("ServiceUnavailableException")
+ || msg.contains("429")
+ || msg.contains("503");
+ }
+
+ private Document toDocument(
+ String key, software.amazon.awssdk.core.document.Document
metadata) {
+ Map<String, Object> metaMap = new HashMap<>();
+ String content = "";
+ if (metadata != null && metadata.isMap()) {
+ metadata.asMap()
+ .forEach(
+ (k, v) -> {
+ if (v.isString()) {
+ metaMap.put(k, v.asString());
+ }
+ });
+ content = metaMap.getOrDefault("source_text", "").toString();
+ }
+ return new Document(content, metaMap, key);
+ }
+}
diff --git
a/integrations/vector-stores/s3vectors/src/test/java/org/apache/flink/agents/integrations/vectorstores/s3vectors/S3VectorsVectorStoreTest.java
b/integrations/vector-stores/s3vectors/src/test/java/org/apache/flink/agents/integrations/vectorstores/s3vectors/S3VectorsVectorStoreTest.java
new file mode 100644
index 0000000..ae8cd0f
--- /dev/null
+++
b/integrations/vector-stores/s3vectors/src/test/java/org/apache/flink/agents/integrations/vectorstores/s3vectors/S3VectorsVectorStoreTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.integrations.vectorstores.s3vectors;
+
+import org.apache.flink.agents.api.embedding.model.BaseEmbeddingModelSetup;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.vectorstores.BaseVectorStore;
+import org.apache.flink.agents.api.vectorstores.Document;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for {@link S3VectorsVectorStore}.
+ *
+ * <p>Integration tests require an S3 Vectors bucket and index. Set S3V_BUCKET
env var to run.
+ */
+public class S3VectorsVectorStoreTest {
+
+ private static final BiFunction<String, ResourceType, Resource> NOOP = (a,
b) -> null;
+
+ @Test
+ @DisplayName("Constructor creates store")
+ void testConstructor() {
+ ResourceDescriptor desc =
+
ResourceDescriptor.Builder.newBuilder(S3VectorsVectorStore.class.getName())
+ .addInitialArgument("embedding_model", "emb")
+ .addInitialArgument("vector_bucket", "my-bucket")
+ .addInitialArgument("vector_index", "my-index")
+ .addInitialArgument("region", "us-east-1")
+ .build();
+ S3VectorsVectorStore store = new S3VectorsVectorStore(desc, NOOP);
+ assertThat(store).isInstanceOf(BaseVectorStore.class);
+ }
+
+ @Test
+ @DisplayName("getStoreKwargs returns bucket and index")
+ void testStoreKwargs() {
+ ResourceDescriptor desc =
+
ResourceDescriptor.Builder.newBuilder(S3VectorsVectorStore.class.getName())
+ .addInitialArgument("embedding_model", "emb")
+ .addInitialArgument("vector_bucket", "test-bucket")
+ .addInitialArgument("vector_index", "test-index")
+ .build();
+ S3VectorsVectorStore store = new S3VectorsVectorStore(desc, NOOP);
+ Map<String, Object> kwargs = store.getStoreKwargs();
+ assertThat(kwargs).containsEntry("vector_bucket", "test-bucket");
+ assertThat(kwargs).containsEntry("vector_index", "test-index");
+ }
+
+ // --- Integration tests (require real S3 Vectors bucket) ---
+
+ private static BaseVectorStore store;
+
+ private static Resource getResource(String name, ResourceType type) {
+ BaseEmbeddingModelSetup emb =
Mockito.mock(BaseEmbeddingModelSetup.class);
+ Mockito.when(emb.embed("Test document one"))
+ .thenReturn(new float[] {0.1f, 0.2f, 0.3f, 0.4f, 0.5f});
+ Mockito.when(emb.embed("Test document two"))
+ .thenReturn(new float[] {0.5f, 0.4f, 0.3f, 0.2f, 0.1f});
+ Mockito.when(emb.embed(Mockito.anyList()))
+ .thenAnswer(
+ inv -> {
+ List<String> texts = inv.getArgument(0);
+ List<float[]> result = new ArrayList<>();
+ for (String t : texts) {
+ result.add(emb.embed(t));
+ }
+ return result;
+ });
+ return emb;
+ }
+
+ @BeforeAll
+ static void initialize() {
+ String bucket = System.getenv("S3V_BUCKET");
+ if (bucket == null) return;
+ ResourceDescriptor desc =
+
ResourceDescriptor.Builder.newBuilder(S3VectorsVectorStore.class.getName())
+ .addInitialArgument("embedding_model", "emb")
+ .addInitialArgument("vector_bucket", bucket)
+ .addInitialArgument(
+ "vector_index",
+ System.getenv().getOrDefault("S3V_INDEX",
"test-index"))
+ .addInitialArgument(
+ "region",
System.getenv().getOrDefault("AWS_REGION", "us-east-1"))
+ .build();
+ store = new S3VectorsVectorStore(desc,
S3VectorsVectorStoreTest::getResource);
+ }
+
+ @Test
+ @EnabledIfEnvironmentVariable(named = "S3V_BUCKET", matches = ".+")
+ @DisplayName("Document add and get")
+ void testDocumentAddAndGet() throws Exception {
+ List<Document> docs = new ArrayList<>();
+ docs.add(new Document("Test document one", Map.of("src", "test"),
"s3v-doc1"));
+ docs.add(new Document("Test document two", Map.of("src", "test"),
"s3v-doc2"));
+ store.add(docs, null, Collections.emptyMap());
+
+ List<Document> retrieved =
+ store.get(List.of("s3v-doc1", "s3v-doc2"), null,
Collections.emptyMap());
+ Assertions.assertEquals(2, retrieved.size());
+
+ store.delete(List.of("s3v-doc1", "s3v-doc2"), null,
Collections.emptyMap());
+ }
+}