This is an automated email from the ASF dual-hosted git repository.
wenjin272 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new 2d2b5628 [Feature][Integration][Java] Add Milvus vector store
integration (#663)
2d2b5628 is described below
commit 2d2b56283e6e88aae45aa1ef7a313da42a6dc5c0
Author: WAR10CK <[email protected]>
AuthorDate: Mon May 18 11:41:59 2026 +0800
[Feature][Integration][Java] Add Milvus vector store integration (#663)
Signed-off-by: YangYanbin <[email protected]>
---
.github/workflows/ci.yml | 34 +-
.../flink/agents/api/resource/ResourceName.java | 4 +
dist/pom.xml | 7 +-
docs/content/docs/development/vector_stores.md | 83 +-
docs/content/docs/faq/faq.md | 3 +-
.../pom.xml | 9 +-
.../resource/test/Mem0LongTermMemoryAgent.java | 34 +-
.../resource/test/Mem0LongTermMemoryTest.java | 40 +-
.../test/VectorStoreCrossLanguageAgent.java | 18 +-
integrations/pom.xml | 1 +
integrations/{ => vector-stores/milvus}/pom.xml | 40 +-
.../vectorstores/milvus/MilvusVectorStore.java | 1194 ++++++++++++++++++++
.../vectorstores/milvus/MilvusVectorStoreTest.java | 642 +++++++++++
integrations/vector-stores/pom.xml | 3 +-
python/flink_agents/api/resource.py | 3 +
.../vector_store_cross_language_agent.py | 122 +-
.../vector_store_cross_language_test.py | 49 +-
.../flink_agents/runtime/java/java_vector_store.py | 4 +
tools/docker/elasticsearch/docker-compose.yml | 46 +
tools/docker/milvus/docker-compose.yml | 98 ++
20 files changed, 2343 insertions(+), 91 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 4001aee9..484f0e48 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -222,21 +222,6 @@ jobs:
runs-on: ${{ matrix.os }}
env:
SKIP_SPOTLESS_CHECK: true
- services:
- elasticsearch:
- image: docker.elastic.co/elasticsearch/elasticsearch:8.19.0
- env:
- discovery.type: single-node
- xpack.security.enabled: false
- ES_JAVA_OPTS: "-Xms512m -Xmx512m"
- ports:
- - 9200:9200
- options: >-
- --health-cmd "curl -f http://localhost:9200/_cluster/health || exit
1"
- --health-interval 10s
- --health-timeout 5s
- --health-retries 10
- --health-start-period 30s
strategy:
fail-fast: false
matrix:
@@ -273,9 +258,26 @@ jobs:
run: bash tools/build.sh
- name: Install ollama
run: bash tools/start_ollama_server.sh
+ - name: Start Elasticsearch
+ run: |
+ docker compose -f tools/docker/elasticsearch/docker-compose.yml down
-v
+ docker compose -f tools/docker/elasticsearch/docker-compose.yml up -d
+ timeout 180 bash -c 'until curl -fsS
http://localhost:9200/_cluster/health; do sleep 5; done'
+ - name: Start Milvus
+ run: |
+ docker compose -f tools/docker/milvus/docker-compose.yml down -v
+ docker compose -f tools/docker/milvus/docker-compose.yml up -d
+ timeout 180 bash -c 'until curl -fsS http://localhost:9091/healthz;
do sleep 5; done'
- name: Run e2e tests
env:
LOG_LEVEL: INFO
run: |
export ES_HOST="http://localhost:9200"
- tools/e2e.sh
\ No newline at end of file
+ export MILVUS_URI="http://localhost:19530"
+ tools/e2e.sh
+ - name: Stop Milvus
+ if: always()
+ run: docker compose -f tools/docker/milvus/docker-compose.yml down -v
+ - name: Stop Elasticsearch
+ if: always()
+ run: docker compose -f tools/docker/elasticsearch/docker-compose.yml
down -v
diff --git
a/api/src/main/java/org/apache/flink/agents/api/resource/ResourceName.java
b/api/src/main/java/org/apache/flink/agents/api/resource/ResourceName.java
index 0d6ee413..d6dfa697 100644
--- a/api/src/main/java/org/apache/flink/agents/api/resource/ResourceName.java
+++ b/api/src/main/java/org/apache/flink/agents/api/resource/ResourceName.java
@@ -191,6 +191,10 @@ public final class ResourceName {
public static final String S3_VECTORS_VECTOR_STORE =
"org.apache.flink.agents.integrations.vectorstores.s3vectors.S3VectorsVectorStore";
+ // Milvus
+ public static final String MILVUS_VECTOR_STORE =
+
"org.apache.flink.agents.integrations.vectorstores.milvus.MilvusVectorStore";
+
// Python Wrapper
public static final String PYTHON_WRAPPER_VECTOR_STORE =
"org.apache.flink.agents.api.vectorstores.python.PythonVectorStore";
diff --git a/dist/pom.xml b/dist/pom.xml
index 40fcc7b1..9e0c0c6c 100644
--- a/dist/pom.xml
+++ b/dist/pom.xml
@@ -100,6 +100,11 @@ under the License.
<artifactId>flink-agents-integrations-vector-stores-elasticsearch</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-agents-integrations-vector-stores-milvus</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-agents-integrations-vector-stores-opensearch</artifactId>
@@ -156,4 +161,4 @@ under the License.
</plugin>
</plugins>
</build>
-</project>
\ No newline at end of file
+</project>
diff --git a/docs/content/docs/development/vector_stores.md
b/docs/content/docs/development/vector_stores.md
index decc9c0d..4efcbe0a 100644
--- a/docs/content/docs/development/vector_stores.md
+++ b/docs/content/docs/development/vector_stores.md
@@ -172,7 +172,7 @@ For vector stores that implement
`CollectionManageableVectorStore`, you can crea
* `delete_collection` / `deleteCollection`: Delete a collection by name.
{{< hint info >}}
-Collection-level operations are only supported for vector stores that
implement `CollectionManageableVectorStore`. Among the built-in providers,
Chroma (Python), Elasticsearch (Java) and OpenSearch (Java) implement this
interface.
+Collection-level operations are only supported for vector stores that
implement `CollectionManageableVectorStore`. Among the built-in providers,
Chroma (Python), Elasticsearch (Java), OpenSearch (Java), and Milvus (Java)
implement this interface.
{{< /hint >}}
{{< tabs "Collection level operations" >}}
@@ -642,9 +642,86 @@ public static ResourceDescriptor vectorStore() {
{{< /tabs >}}
+### Milvus
+
+[Milvus](https://milvus.io/) is an open-source vector database designed for
high-dimensional vector search at scale.
+
+{{< hint info >}}
+Milvus is currently supported in the Java API only. To use Milvus from Python
agents, see [Using Cross-Language Providers](#using-cross-language-providers).
+{{< /hint >}}
+
+#### Prerequisites
+
+1. A Milvus server.
+
+#### MilvusVectorStore Parameters
+
+| Parameter | Type | Default |
Description |
+|-----------------------------|------|--------------------------------------|-----------------------------------------------------------------------------|
+| `embedding_model` | str | Required |
Reference to embedding model resource name |
+| `collection` | str | `"flink_agents_milvus_collection"` |
Default target Milvus collection name |
+| `collection_name` | str | None |
Alias for `collection` |
+| `index` | str | None |
Alias for `collection`, mainly for cross-provider compatibility |
+| `id_field` | str | `"id"` |
Name of the primary key field |
+| `content_field` | str | `"content"` |
Name of the field storing document content |
+| `metadata_field` | str | `"metadata"` |
Name of the JSON field storing document metadata |
+| `vector_field` | str | `"embedding"` |
Name of the FloatVector field used for vector search |
+| `dims` | int | `768` |
Vector dimensionality |
+| `id_max_length` | int | `65535` |
Maximum length for the VarChar primary key field |
+| `content_max_length` | int | `65535` |
Maximum length for the VarChar content field |
+| `metric_type` | str | `"COSINE"` |
Milvus metric type used by vector search |
+| `index_type` | str | `"AUTOINDEX"` |
Milvus vector index type |
+| `index_params` | map | `{}` |
Extra vector index parameters passed to Milvus |
+| `metadata_index_keys` | list | `user_id`, `agent_id`, `run_id`,
`actor_id`, `category` | Additional metadata JSON keys indexed with path
indexes |
+| `metadata_index_cast_types` | map | Default keys use `"VARCHAR"` |
Per-metadata-key JSON path index cast type overrides |
+| `num_shards` | int | `1` |
Number of Milvus shards for newly created collections |
+| `consistency_level` | str | `"BOUNDED"` |
Milvus consistency level for collection creation, query, and search |
+| `max_get_limit` | int | `10000` |
Maximum number of documents returned by `get` when no limit is specified |
+| `load_timeout_ms` | long | `120000` |
Timeout for loading collections |
+| `uri` | str | `"http://localhost:19530"` |
Milvus endpoint |
+| `host` | str | `"localhost"` |
Milvus host used when `uri` is not set |
+| `port` | int | `19530` |
Milvus port used when `uri` is not set |
+| `db_name` | str | None |
Milvus database name |
+| `token` | str | None |
Token for Milvus authentication |
+| `username` | str | None |
Username for basic authentication |
+| `password` | str | None |
Password for basic authentication |
+| `enable_precheck` | bool | `false` |
Whether to enable Milvus client precheck |
+
+{{< hint info >}}
+When creating a collection, MilvusVectorStore creates a primary-key field,
content field, JSON metadata field, vector field, vector index, and JSON
metadata indexes. The default metadata JSON path indexes cover common filter
keys such as `user_id`, `agent_id`, `run_id`, `actor_id`, and `category`; add
`metadata_index_keys` for application-specific filter keys.
+
+The default shard count is `1`. As a rough capacity-planning rule, use about
one shard per 100 million vectors, and increase it for heavier write throughput.
+{{< /hint >}}
+
+#### Usage Example
+
+{{< tabs "Milvus Usage Example" >}}
+
+{{< tab "Java" >}}
+
+```java
+@VectorStore
+public static ResourceDescriptor vectorStore() {
+ return
ResourceDescriptor.Builder.newBuilder(ResourceName.VectorStore.MILVUS_VECTOR_STORE)
+ .addInitialArgument("embedding_model", "embeddingModel")
+ .addInitialArgument("uri", "http://localhost:19530")
+ .addInitialArgument("collection", "my_documents")
+ .addInitialArgument("dims", 1536)
+ .addInitialArgument("metric_type", "COSINE")
+ .addInitialArgument("index_type", "AUTOINDEX")
+ // Optional metadata JSON path indexes
+ // .addInitialArgument("metadata_index_keys", List.of("user_id",
"agent_id", "run_id"))
+ .build();
+}
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
## Using Cross-Language Providers
-Flink Agents supports cross-language vector store integration, allowing you to
use vector stores implemented in one language (Java or Python) from agents
written in the other language. This is particularly useful when a vector store
provider is only available in one language (e.g., Elasticsearch is currently
Java-only, Chroma is currently Python-only).
+Flink Agents supports cross-language vector store integration, allowing you to
use vector stores implemented in one language (Java or Python) from agents
written in the other language. This is particularly useful when a vector store
provider is only available in one language (e.g., Elasticsearch and Milvus are
currently Java-only, Chroma is currently Python-only).
{{< hint warning >}}
**Limitations:**
@@ -1101,4 +1178,4 @@ public class MyVectorStore extends BaseVectorStore
{{< /tab >}}
-{{< /tabs >}}
\ No newline at end of file
+{{< /tabs >}}
diff --git a/docs/content/docs/faq/faq.md b/docs/content/docs/faq/faq.md
index 6931ec3c..0f3b4786 100644
--- a/docs/content/docs/faq/faq.md
+++ b/docs/content/docs/faq/faq.md
@@ -117,6 +117,7 @@ Flink Agents provides built-in integrations for many
ecosystem providers. Some i
|---|---|---|
| [Chroma]({{< ref "docs/development/vector_stores#chroma" >}}) | ✅ | ❌ |
| [Elasticsearch]({{< ref "docs/development/vector_stores#elasticsearch" >}})
| ❌ | ✅ |
+| [Milvus]({{< ref "docs/development/vector_stores#milvus" >}}) | ❌ | ✅ |
**MCP Server**
@@ -131,4 +132,4 @@ Flink Agents provides built-in integrations for many
ecosystem providers. Some i
To avoid potential conflict with Flink cluster, the scope of the dependencies
related to Flink and Flink Agents for agent job are provided. See [Maven
Dependencies]({{< ref
"docs/get-started/installation#maven-dependencies-for-java" >}}) for details.
To run the examples in IDE, users must enable the IDE feature: `add
dependencies with provided scope to classpath`.
-* For **IDEA**, edit the **`Run/Debug Configuration`** and enable **`add
dependencies with provided scope to classpath`**. See [Run/Debug
Configuration](https://www.jetbrains.com/help/idea/run-debug-configuration-scala.html)
for details.
\ No newline at end of file
+* For **IDEA**, edit the **`Run/Debug Configuration`** and enable **`add
dependencies with provided scope to classpath`**. See [Run/Debug
Configuration](https://www.jetbrains.com/help/idea/run-debug-configuration-scala.html)
for details.
diff --git
a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/pom.xml
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/pom.xml
index 2d19a8e1..8b2f0242 100644
--- a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/pom.xml
+++ b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/pom.xml
@@ -35,7 +35,7 @@
<artifactId>flink-agents-integrations-embedding-models-ollama</artifactId>
<version>${project.version}</version>
</dependency>
- <!-- Required by Mem0LongTermMemoryTest: OpenAI-compatible chat model
+ ES vector store. -->
+ <!-- Required by Mem0LongTermMemoryTest: OpenAI-compatible chat model
+ vector stores. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-agents-integrations-chat-models-openai</artifactId>
@@ -46,6 +46,11 @@
<artifactId>flink-agents-integrations-vector-stores-elasticsearch</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-agents-integrations-vector-stores-milvus</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
@@ -67,4 +72,4 @@
<version>${flink.version}</version>
</dependency>
</dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git
a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/Mem0LongTermMemoryAgent.java
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/Mem0LongTermMemoryAgent.java
index d68bb258..9a7655b6 100644
---
a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/Mem0LongTermMemoryAgent.java
+++
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/Mem0LongTermMemoryAgent.java
@@ -52,19 +52,20 @@ import java.util.UUID;
* full retrieved item set.
*
* <p>All resources are declared as native Java implementations (Ollama chat /
embedding,
- * Elasticsearch vector store). Python's mem0 adapter consumes them through
the cross-language
- * bridge: {@code ctx.get_resource(name, type)} on the Python side returns a
Java*Impl wrapper that
- * delegates back into Java via Pemja.
+ * Elasticsearch or Milvus vector store). Python's mem0 adapter consumes them
through the
+ * cross-language bridge: {@code ctx.get_resource(name, type)} on the Python
side returns a
+ * Java*Impl wrapper that delegates back into Java via Pemja.
*
- * <p>The test driving this agent must (1) pull the Ollama models and (2)
provide ES connection env
- * vars ({@code ES_HOST}, {@code ES_INDEX}, {@code ES_DIMS}, {@code
ES_VECTOR_FIELD}, optional
- * {@code ES_USERNAME}/{@code ES_PASSWORD}); see {@link
Mem0LongTermMemoryTest}.
+ * <p>The test driving this agent must (1) pull the Ollama models and (2)
provide vector-store
+ * connection env vars; see {@link Mem0LongTermMemoryTest}.
*/
public class Mem0LongTermMemoryAgent extends Agent {
public static final String CHAT_MODEL = "qwen3.6-plus";
public static final String OLLAMA_EMBEDDING_MODEL = "nomic-embed-text";
public static final String MEMORY_SET_NAME = "test_ltm";
+ public static final String ES_LTM_STORE = "esLtmStore";
+ public static final String MILVUS_LTM_STORE = "milvusLtmStore";
/** Mirrors the Python e2e: dashscope-hosted OpenAI-compatible endpoint,
env-overridable. */
private static final String DEFAULT_BASE_URL =
"https://coding.dashscope.aliyuncs.com/v1";
@@ -139,7 +140,9 @@ public class Mem0LongTermMemoryAgent extends Agent {
ResourceDescriptor.Builder.newBuilder(
ResourceName.VectorStore.ELASTICSEARCH_VECTOR_STORE)
.addInitialArgument("embedding_model",
"ollamaNomicEmbedText")
- .addInitialArgument("host", System.getenv("ES_HOST"))
+ .addInitialArgument(
+ "host",
+ System.getenv().getOrDefault("ES_HOST",
"http://localhost:9200"))
.addInitialArgument(
"collection",
UUID.randomUUID().toString().substring(0, 8) +
"-context");
@@ -152,6 +155,23 @@ public class Mem0LongTermMemoryAgent extends Agent {
return builder.build();
}
+ @VectorStore
+ public static ResourceDescriptor milvusLtmStore() {
+ return
ResourceDescriptor.Builder.newBuilder(ResourceName.VectorStore.MILVUS_VECTOR_STORE)
+ .addInitialArgument("embedding_model", "ollamaNomicEmbedText")
+ .addInitialArgument(
+ "uri", System.getenv().getOrDefault("MILVUS_URI",
"http://localhost:19530"))
+ .addInitialArgument(
+ "collection",
+ "flink_agents_mem0_" +
UUID.randomUUID().toString().replace("-", ""))
+ .addInitialArgument("dims", 768)
+ // Test-only: Mem0 e2e reads immediately after writes.
Production should use the
+ // default BOUNDED consistency unless immediate
read-after-write visibility is
+ // required.
+ .addInitialArgument("consistency_level", "STRONG")
+ .build();
+ }
+
@Action(listenEventTypes = {InputEvent.EVENT_TYPE})
public static void addItems(Event event, RunnerContext ctx) throws
Exception {
InputEvent inputEvent = InputEvent.fromEvent(event);
diff --git
a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/Mem0LongTermMemoryTest.java
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/Mem0LongTermMemoryTest.java
index d11b7dfe..166db7ad 100644
---
a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/Mem0LongTermMemoryTest.java
+++
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/Mem0LongTermMemoryTest.java
@@ -28,7 +28,8 @@ import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.time.Instant;
@@ -38,6 +39,8 @@ import java.util.List;
import java.util.Map;
import static
org.apache.flink.agents.resource.test.CrossLanguageTestPreparationUtils.pullModel;
+import static
org.apache.flink.agents.resource.test.Mem0LongTermMemoryAgent.ES_LTM_STORE;
+import static
org.apache.flink.agents.resource.test.Mem0LongTermMemoryAgent.MILVUS_LTM_STORE;
import static
org.apache.flink.agents.resource.test.Mem0LongTermMemoryAgent.OLLAMA_EMBEDDING_MODEL;
/**
@@ -54,7 +57,8 @@ import static
org.apache.flink.agents.resource.test.Mem0LongTermMemoryAgent.OLLA
* <li>{@code ACTION_API_KEY} env var (and optionally {@code
ACTION_BASE_URL}) for the
* OpenAI-compatible chat model — mirrors the Python e2e test's setup
* <li>{@code python} on PATH with {@code mem0ai} and {@code flink_agents}
installed
- * <li>Elasticsearch reachable via the {@code ES_HOST} env var
+ * <li>Elasticsearch reachable via the {@code ES_HOST} env var, or Milvus
reachable via the {@code
+ * MILVUS_URI} env var
* </ul>
*/
public class Mem0LongTermMemoryTest {
@@ -62,25 +66,29 @@ public class Mem0LongTermMemoryTest {
private final boolean embeddingReady;
private final boolean pythonReady;
private final boolean esConfigured;
+ private final boolean milvusConfigured;
private final boolean apiKeySet;
public Mem0LongTermMemoryTest() throws IOException {
embeddingReady = pullModel(OLLAMA_EMBEDDING_MODEL);
pythonReady = isPythonAvailable();
esConfigured = System.getenv("ES_HOST") != null;
+ milvusConfigured = System.getenv("MILVUS_URI") != null;
apiKeySet = System.getenv("ACTION_API_KEY") != null;
}
- @Test
+ @ParameterizedTest(name = "vectorStore={0}")
+ @ValueSource(strings = {ES_LTM_STORE, MILVUS_LTM_STORE})
@Disabled("Using mem0 in java depends on the pemja fix.")
- public void testMem0LongTermMemory() throws Exception {
+ public void testMem0LongTermMemory(String vectorStore) throws Exception {
Assumptions.assumeTrue(
embeddingReady,
"Ollama is not reachable or the embedding model could not be
pulled");
Assumptions.assumeTrue(
pythonReady,
"`python` executable not found on PATH; this test requires
Python with mem0ai installed");
- Assumptions.assumeTrue(esConfigured, "Elasticsearch env var (ES_HOST)
is not set");
+ Assumptions.assumeTrue(
+ isVectorStoreConfigured(vectorStore),
vectorStoreMissingMessage(vectorStore));
Assumptions.assumeTrue(
apiKeySet,
"ACTION_API_KEY env var is not set; required for the
OpenAI-compatible chat model");
@@ -105,7 +113,7 @@ public class Mem0LongTermMemoryTest {
agentsEnv
.getConfig()
.set(LongTermMemoryOptions.Mem0.EMBEDDING_MODEL_SETUP,
"ollamaNomicEmbedText");
- agentsEnv.getConfig().set(LongTermMemoryOptions.Mem0.VECTOR_STORE,
"esLtmStore");
+ agentsEnv.getConfig().set(LongTermMemoryOptions.Mem0.VECTOR_STORE,
vectorStore);
DataStream<Object> outputStream =
agentsEnv
@@ -129,6 +137,26 @@ public class Mem0LongTermMemoryTest {
}
}
+ private boolean isVectorStoreConfigured(String vectorStore) {
+ if (ES_LTM_STORE.equals(vectorStore)) {
+ return esConfigured;
+ }
+ if (MILVUS_LTM_STORE.equals(vectorStore)) {
+ return milvusConfigured;
+ }
+ throw new IllegalArgumentException("Unknown vector store: " +
vectorStore);
+ }
+
+ private static String vectorStoreMissingMessage(String vectorStore) {
+ if (ES_LTM_STORE.equals(vectorStore)) {
+ return "Elasticsearch env var (ES_HOST) is not set";
+ }
+ if (MILVUS_LTM_STORE.equals(vectorStore)) {
+ return "Milvus env var (MILVUS_URI) is not set";
+ }
+ return "Unknown vector store: " + vectorStore;
+ }
+
@SuppressWarnings("unchecked")
private void checkResult(CloseableIterator<Object> results) throws
Exception {
Map<String, Map<String, Object>> records = new HashMap<>();
diff --git
a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/VectorStoreCrossLanguageAgent.java
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/VectorStoreCrossLanguageAgent.java
index e252fa08..cacf6244 100644
---
a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/VectorStoreCrossLanguageAgent.java
+++
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/VectorStoreCrossLanguageAgent.java
@@ -59,6 +59,7 @@ import java.util.Map;
public class VectorStoreCrossLanguageAgent extends Agent {
public static final String OLLAMA_MODEL = "nomic-embed-text";
public static final String TEST_COLLECTION = "test_collection";
+ private static final String VECTOR_STORE_BACKEND = "CHROMA";
@EmbeddingModelConnection
public static ResourceDescriptor embeddingConnection() {
@@ -121,7 +122,8 @@ public class VectorStoreCrossLanguageAgent extends Agent {
TEST_COLLECTION,
Map.of("metadata", Map.of("key1", "value1", "key2",
"value2")));
- System.out.println("[TEST] Vector store Collection Management
PASSED");
+ System.out.printf(
+ "[TEST][%s] Vector store Collection Management PASSED%n",
VECTOR_STORE_BACKEND);
vectorStore.deleteCollection(TEST_COLLECTION);
Assertions.assertThrows(
@@ -168,7 +170,8 @@ public class VectorStoreCrossLanguageAgent extends Agent {
Assertions.assertEquals(
Map.of("category", "database", "source", "test"),
doc.getMetadata());
- System.out.println("[TEST] Vector store Document Management
PASSED");
+ System.out.printf(
+ "[TEST][%s] Vector store Document Management PASSED%n",
VECTOR_STORE_BACKEND);
// Verify VectorStoreQuery.filters survives the Java->Python
bridge.
// ChromaDB applies the unified-DSL filter to its `where` clause,
so the
@@ -191,7 +194,8 @@ public class VectorStoreCrossLanguageAgent extends Agent {
filteredDocs.get(0).getId(),
"Filter {category=database} should match doc2");
- System.out.println("[TEST] Vector store filter query PASSED");
+ System.out.printf(
+ "[TEST][%s] Vector store filter query PASSED%n",
VECTOR_STORE_BACKEND);
ctx.getShortTermMemory().set("is_initialized", true);
}
@@ -244,12 +248,16 @@ public class VectorStoreCrossLanguageAgent extends Agent {
first.getContent().substring(0, Math.min(50,
first.getContent().length())));
ctx.sendEvent(new OutputEvent(result));
- System.out.printf("[TEST] Vector store retrieval PASSED,
count=%d%n", documents.size());
+ System.out.printf(
+ "[TEST][%s] Vector store retrieval PASSED, count=%d%n",
+ VECTOR_STORE_BACKEND, documents.size());
} catch (Exception e) {
result.put("test_status", "FAILED");
result.put("error", e.getMessage());
ctx.sendEvent(new OutputEvent(result));
- System.err.printf("[TEST] Vector store retrieval FAILED: %s%n",
e.getMessage());
+ System.err.printf(
+ "[TEST][%s] Vector store retrieval FAILED: %s%n",
+ VECTOR_STORE_BACKEND, e.getMessage());
throw e;
}
}
diff --git a/integrations/pom.xml b/integrations/pom.xml
index 9989a5f0..75404881 100644
--- a/integrations/pom.xml
+++ b/integrations/pom.xml
@@ -33,6 +33,7 @@ under the License.
<properties>
<ollama4j.version>1.1.5</ollama4j.version>
<elasticsearch.version>8.19.0</elasticsearch.version>
+ <milvus.version>2.6.18</milvus.version>
<openai.version>4.8.0</openai.version>
<anthropic.version>2.11.1</anthropic.version>
<aws.sdk.version>2.32.16</aws.sdk.version>
diff --git a/integrations/pom.xml b/integrations/vector-stores/milvus/pom.xml
similarity index 57%
copy from integrations/pom.xml
copy to integrations/vector-stores/milvus/pom.xml
index 9989a5f0..d6521021 100644
--- a/integrations/pom.xml
+++ b/integrations/vector-stores/milvus/pom.xml
@@ -22,27 +22,31 @@ under the License.
<parent>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-agents</artifactId>
+ <artifactId>flink-agents-integrations-vector-stores</artifactId>
<version>0.3-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
</parent>
- <artifactId>flink-agents-integrations</artifactId>
- <name>Flink Agents : Integrations:</name>
- <packaging>pom</packaging>
+ <artifactId>flink-agents-integrations-vector-stores-milvus</artifactId>
+ <name>Flink Agents : Integrations: Vector Stores: Milvus</name>
+ <packaging>jar</packaging>
- <properties>
- <ollama4j.version>1.1.5</ollama4j.version>
- <elasticsearch.version>8.19.0</elasticsearch.version>
- <openai.version>4.8.0</openai.version>
- <anthropic.version>2.11.1</anthropic.version>
- <aws.sdk.version>2.32.16</aws.sdk.version>
- </properties>
-
- <modules>
- <module>chat-models</module>
- <module>embedding-models</module>
- <module>vector-stores</module>
- <module>mcp</module>
- </modules>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-agents-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.milvus</groupId>
+ <artifactId>milvus-sdk-java</artifactId>
+ <version>${milvus.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ </dependencies>
</project>
diff --git
a/integrations/vector-stores/milvus/src/main/java/org/apache/flink/agents/integrations/vectorstores/milvus/MilvusVectorStore.java
b/integrations/vector-stores/milvus/src/main/java/org/apache/flink/agents/integrations/vectorstores/milvus/MilvusVectorStore.java
new file mode 100644
index 00000000..7eafcd35
--- /dev/null
+++
b/integrations/vector-stores/milvus/src/main/java/org/apache/flink/agents/integrations/vectorstores/milvus/MilvusVectorStore.java
@@ -0,0 +1,1194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.integrations.vectorstores.milvus;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import io.milvus.v2.client.ConnectConfig;
+import io.milvus.v2.client.MilvusClientV2;
+import io.milvus.v2.common.ConsistencyLevel;
+import io.milvus.v2.common.DataType;
+import io.milvus.v2.common.IndexParam;
+import io.milvus.v2.service.collection.request.AddFieldReq;
+import io.milvus.v2.service.collection.request.CreateCollectionReq;
+import io.milvus.v2.service.collection.request.DropCollectionReq;
+import io.milvus.v2.service.collection.request.GetLoadStateReq;
+import io.milvus.v2.service.collection.request.HasCollectionReq;
+import io.milvus.v2.service.collection.request.LoadCollectionReq;
+import io.milvus.v2.service.vector.request.DeleteReq;
+import io.milvus.v2.service.vector.request.InsertReq;
+import io.milvus.v2.service.vector.request.QueryReq;
+import io.milvus.v2.service.vector.request.SearchReq;
+import io.milvus.v2.service.vector.request.UpsertReq;
+import io.milvus.v2.service.vector.request.data.BaseVector;
+import io.milvus.v2.service.vector.request.data.FloatVec;
+import io.milvus.v2.service.vector.response.QueryResp;
+import io.milvus.v2.service.vector.response.SearchResp;
+import org.apache.flink.agents.api.resource.ResourceContext;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.vectorstores.BaseVectorStore;
+import
org.apache.flink.agents.api.vectorstores.CollectionManageableVectorStore;
+import org.apache.flink.agents.api.vectorstores.Document;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+
+/**
+ * Milvus-backed implementation of a vector store.
+ *
+ * <p>This implementation executes dense-vector similarity search against a
Milvus collection. It
+ * integrates with an embedding model (configured via the {@code
embedding_model} resource argument
+ * inherited from {@link BaseVectorStore}) to convert query text into
embeddings and then performs
+ * vector search using Milvus' search API.
+ *
+ * <p>The store creates collections with a simple dense-vector schema:
+ *
+ * <ul>
+ * <li>{@code id}: VarChar primary key
+ * <li>{@code content}: VarChar document content
+ * <li>{@code metadata}: JSON metadata map
+ * <li>{@code embedding}: FloatVector
+ * </ul>
+ *
+ * <p>Configuration is provided through {@link ResourceDescriptor} arguments.
The most relevant ones
+ * are:
+ *
+ * <ul>
+ * <li>{@code collection} or {@code index} (optional): Target collection
name. If omitted,
+ * defaults to {@link #DEFAULT_COLLECTION}.
+ * <li>{@code dims} (optional): Vector dimensionality; defaults to {@link
#DEFAULT_DIMENSION}.
+ * <li>{@code vector_field}, {@code content_field}, {@code metadata_field},
{@code id_field}
+ * (optional): Schema field names.
+ * <li>{@code metric_type} (optional): Milvus metric type; defaults to
{@code COSINE}.
+ * <li>{@code index_type} and {@code index_params} (optional): Milvus vector
index settings.
+ * <li>{@code metadata_index_keys} (optional): Additional top-level metadata
keys to index as JSON
+ * path indexes. The default keys are {@code user_id}, {@code agent_id},
{@code run_id},
+ * {@code actor_id}, and {@code category}.
+ * <li>{@code metadata_index_cast_types} (optional): Map from metadata key
to Milvus JSON index
+ * cast type. Defaults to {@code VARCHAR}; use values such as {@code
DOUBLE} for numeric
+ * metadata keys.
+ * <li>{@code num_shards} (optional): Number of Milvus shards to create with
the collection;
+ * defaults to {@link #DEFAULT_NUM_SHARDS}. As a rough capacity-planning
rule, use about one
+ * shard per 100 million vectors, and increase it for heavier write
throughput.
+ * <li>{@code consistency_level} (optional): Milvus consistency level for
query and search;
+ * defaults to {@code BOUNDED}. Use {@code STRONG} when immediate
read-after-write visibility
+ * is required.
+ * <li>{@code load_timeout_ms} (optional): Timeout used when loading a
collection from {@link
+ * #createCollectionIfNotExists(String, Map)}; defaults to {@link
#DEFAULT_LOAD_TIMEOUT_MS}.
+ * <li>{@code uri}, or {@code host}/{@code port} (optional): Milvus
endpoint. If omitted, defaults
+ * to {@code http://localhost:19530}.
+ * <li>Authentication (optional): Either token auth via {@code token}, or
basic auth via {@code
+ * username}/{@code password}.
+ * </ul>
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * ResourceDescriptor desc = ResourceDescriptor.Builder
+ * .newBuilder(MilvusVectorStore.class.getName())
+ * .addInitialArgument("embedding_model", "textEmbedder")
+ * .addInitialArgument("uri", "http://localhost:19530")
+ * .addInitialArgument("collection", "my_documents")
+ * .addInitialArgument("dims", 768)
+ * .addInitialArgument("metric_type", "COSINE")
+ * .addInitialArgument("index_type", "AUTOINDEX")
+ * .build();
+ * }</pre>
+ */
+public class MilvusVectorStore extends BaseVectorStore implements
CollectionManageableVectorStore {
+
+ /**
+ * Default collection name used when {@code collection}, {@code
collection_name}, and {@code
+ * index} are omitted.
+ */
+ public static final String DEFAULT_COLLECTION =
"flink_agents_milvus_collection";
+ /** Default primary key field name. */
+ public static final String DEFAULT_ID_FIELD = "id";
+ /** Default field name used to store document content. */
+ public static final String DEFAULT_CONTENT_FIELD = "content";
+ /** Default JSON field name used to store document metadata. */
+ public static final String DEFAULT_METADATA_FIELD = "metadata";
+ /** Default FloatVector field name on which Milvus search is executed. */
+ public static final String DEFAULT_VECTOR_FIELD = "embedding";
+ /** Default index name for the full metadata JSON index. */
+ public static final String DEFAULT_METADATA_INDEX_NAME =
"metadata_json_index";
+ /** Metadata keys that are commonly used by Mem0 and vector-store filter
callers. */
+ public static final List<String> DEFAULT_METADATA_INDEX_KEYS =
+ List.of("user_id", "agent_id", "run_id", "actor_id", "category");
+ /** Default Milvus JSON cast type used for metadata path indexes. */
+ public static final String DEFAULT_METADATA_INDEX_CAST_TYPE = "VARCHAR";
+ /** Default vector dimensionality used when {@code dims} is not provided.
*/
+ public static final int DEFAULT_DIMENSION = 768;
+ /** The maximum number of documents that can be retrieved by get when
limit is omitted. */
+ public static final int DEFAULT_MAX_GET_LIMIT = 10000;
+ /** Default maximum length for the VarChar primary key field. */
+ public static final int DEFAULT_ID_MAX_LENGTH = 65535;
+ /** Default maximum length for the VarChar content field. */
+ public static final int DEFAULT_CONTENT_MAX_LENGTH = 65535;
+ /** Default number of Milvus shards used when creating a collection. */
+ public static final int DEFAULT_NUM_SHARDS = 1;
+ /** Default timeout for synchronous collection load operations. */
+ public static final long DEFAULT_LOAD_TIMEOUT_MS = 120000L;
+
+ /** Milvus connection configuration built from the resource descriptor. */
+ private final ConnectConfig connectConfig;
+ /** Lazily-created Milvus client used to execute collection and vector
requests. */
+ private transient volatile @Nullable MilvusClientV2 client;
+
+ private final Gson gson = new Gson();
+
+ /** Resolved Milvus endpoint URI. */
+ private final String uri;
+ /** Optional Milvus database name. */
+ private final @Nullable String databaseName;
+ /** Default collection name used when a per-call collection is not
supplied. */
+ private final String defaultCollection;
+ /** Name of the primary key field. */
+ private final String idField;
+ /** Name of the content field to store the document content. */
+ private final String contentField;
+ /** Name of the JSON field to store document metadata. */
+ private final String metadataField;
+ /** Name of the FloatVector field on which vector search is executed. */
+ private final String vectorField;
+ /** Vector dimensionality of the {@link #vectorField}. */
+ private final int dims;
+ /** Default query limit used by get when no limit is provided. */
+ private final int maxGetLimit;
+ /** Maximum length for the VarChar primary key field. */
+ private final int idMaxLength;
+ /** Maximum length for the VarChar content field. */
+ private final int contentMaxLength;
+ /** Default Milvus metric type used for collection indexes and search. */
+ private final IndexParam.MetricType metricType;
+ /** Default Milvus index type used when creating collections. */
+ private final IndexParam.IndexType indexType;
+ /** Extra index parameters passed to Milvus collection creation. */
+ private final Map<String, Object> indexParams;
+ /** Metadata JSON keys indexed with path-specific indexes during
collection creation. */
+ private final List<String> metadataIndexKeys;
+ /** Per-metadata-key JSON cast type overrides for path-specific indexes. */
+ private final Map<String, String> metadataIndexCastTypes;
+ /** Number of shards used when creating collections. */
+ private final int numShards;
+ /** Consistency level used for collection creation, query, and search
requests. */
+ private final ConsistencyLevel consistencyLevel;
+ /** Timeout used when loading collections from create-collection paths. */
+ private final long loadTimeoutMs;
+ /**
+ * Creates a new {@code MilvusVectorStore} from the provided descriptor
and resource resolver.
+ *
+ * <p>The constructor reads connection, authentication, schema, index, and
query defaults from
+ * the descriptor and prepares a {@link ConnectConfig}. The Milvus client
itself is created
+ * lazily on first use.
+ *
+ * @param descriptor Resource descriptor containing configuration arguments
+ * @param resourceContext Context used to resolve other resources by name
and type
+ */
+ public MilvusVectorStore(ResourceDescriptor descriptor, ResourceContext
resourceContext) {
+ super(descriptor, resourceContext);
+
+ this.uri = resolveUri(descriptor);
+ this.databaseName = stringArg(descriptor, "db_name", null);
+ this.defaultCollection =
+ stringArg(
+ descriptor,
+ "collection",
+ stringArg(
+ descriptor,
+ "collection_name",
+ stringArg(descriptor, "index",
DEFAULT_COLLECTION)));
+ this.idField = stringArg(descriptor, "id_field", DEFAULT_ID_FIELD);
+ this.contentField = stringArg(descriptor, "content_field",
DEFAULT_CONTENT_FIELD);
+ this.metadataField = stringArg(descriptor, "metadata_field",
DEFAULT_METADATA_FIELD);
+ this.vectorField = stringArg(descriptor, "vector_field",
DEFAULT_VECTOR_FIELD);
+ this.dims = intArg(descriptor, "dims", DEFAULT_DIMENSION);
+ this.maxGetLimit = intArg(descriptor, "max_get_limit",
DEFAULT_MAX_GET_LIMIT);
+ this.idMaxLength = intArg(descriptor, "id_max_length",
DEFAULT_ID_MAX_LENGTH);
+ this.contentMaxLength =
+ intArg(descriptor, "content_max_length",
DEFAULT_CONTENT_MAX_LENGTH);
+ this.metricType =
+ enumArg(
+ IndexParam.MetricType.class,
+ stringArg(descriptor, "metric_type",
IndexParam.MetricType.COSINE.name()));
+ this.indexType =
+ enumArg(
+ IndexParam.IndexType.class,
+ stringArg(descriptor, "index_type",
IndexParam.IndexType.AUTOINDEX.name()));
+ this.indexParams = mapArg(descriptor, "index_params");
+ this.metadataIndexCastTypes = metadataIndexCastTypesArg(descriptor);
+ this.metadataIndexKeys = metadataIndexKeysArg(descriptor,
this.metadataIndexCastTypes);
+ this.numShards = intArg(descriptor, "num_shards", DEFAULT_NUM_SHARDS);
+ this.consistencyLevel =
+ enumArg(
+ ConsistencyLevel.class,
+ stringArg(
+ descriptor, "consistency_level",
ConsistencyLevel.BOUNDED.name()));
+ this.loadTimeoutMs = longArg(descriptor, "load_timeout_ms",
DEFAULT_LOAD_TIMEOUT_MS);
+
+ ConnectConfig.ConnectConfigBuilder builder =
+ ConnectConfig.builder()
+ .uri(this.uri)
+ .secure(this.uri.startsWith("https://"))
+ .enablePrecheck(booleanArg(descriptor,
"enable_precheck", false));
+
+ String token = stringArg(descriptor, "token", null);
+ if (token != null && !token.isEmpty()) {
+ builder.token(token);
+ }
+ String username = stringArg(descriptor, "username", null);
+ String password = stringArg(descriptor, "password", null);
+ if (username != null && password != null) {
+ builder.username(username).password(password);
+ }
+ if (this.databaseName != null) {
+ builder.dbName(this.databaseName);
+ }
+
+ this.connectConfig = builder.build();
+ }
+
+ @Override
+ public void close() {
+ synchronized (this) {
+ if (this.client != null) {
+ this.client.close();
+ this.client = null;
+ }
+ }
+ }
+
+ /**
+ * Returns default store-level arguments collected from the descriptor.
+ *
+ * <p>The returned map can be merged with per-query arguments to form the
complete set of
+ * parameters for Milvus collection creation, retrieval, and vector search
operations.
+ *
+ * @return map of default store arguments such as {@code uri}, {@code
collection}, {@code
+ * vector_field}, {@code dims}, {@code metric_type}, {@code
index_type}, and {@code
+ * num_shards}.
+ */
+ @Override
+ public Map<String, Object> getStoreKwargs() {
+ Map<String, Object> kwargs = new HashMap<>();
+ kwargs.put("uri", this.uri);
+ kwargs.put("collection", this.defaultCollection);
+ kwargs.put("index", this.defaultCollection);
+ kwargs.put("id_field", this.idField);
+ kwargs.put("content_field", this.contentField);
+ kwargs.put("metadata_field", this.metadataField);
+ kwargs.put("vector_field", this.vectorField);
+ kwargs.put("dims", this.dims);
+ kwargs.put("metric_type", this.metricType.name());
+ kwargs.put("index_type", this.indexType.name());
+ kwargs.put("index_params", new HashMap<>(this.indexParams));
+ kwargs.put("metadata_index_keys", new
ArrayList<>(this.metadataIndexKeys));
+ kwargs.put("metadata_index_cast_types", new
HashMap<>(this.metadataIndexCastTypes));
+ kwargs.put("num_shards", this.numShards);
+ kwargs.put("consistency_level", this.consistencyLevel.name());
+ kwargs.put("load_timeout_ms", this.loadTimeoutMs);
+ if (this.databaseName != null) {
+ kwargs.put("db_name", this.databaseName);
+ }
+ return kwargs;
+ }
+
+ /** Returns the lazily-created Milvus client. */
+ private MilvusClientV2 client() {
+ MilvusClientV2 current = this.client;
+ if (current == null) {
+ synchronized (this) {
+ current = this.client;
+ if (current == null) {
+ current = new MilvusClientV2(this.connectConfig);
+ this.client = current;
+ }
+ }
+ }
+ return current;
+ }
+
+ /**
+ * Creates the Milvus collection for the given name if it does not already
exist.
+ *
+ * <p>The created schema contains a VarChar primary key, a VarChar content
field, a nullable
+ * JSON metadata field, and one FloatVector field. Vector index settings
can be supplied through
+ * {@code kwargs}; otherwise descriptor defaults are used. The collection
is loaded before this
+ * method returns, including the case where it already existed.
+ */
+ @Override
+ public void createCollectionIfNotExists(String name, Map<String, Object>
kwargs) {
+ if (hasCollection(name)) {
+ ensureCollectionLoaded(name, kwargs);
+ return;
+ }
+
+ int dimension = intFromMap(kwargs, "dims", this.dims);
+ IndexParam.MetricType metric =
+ enumFromMap(IndexParam.MetricType.class, kwargs,
"metric_type", this.metricType);
+ IndexParam.IndexType index =
+ enumFromMap(IndexParam.IndexType.class, kwargs, "index_type",
this.indexType);
+ int numShards = intFromMap(kwargs, "num_shards", this.numShards);
+ Map<String, Object> params =
+ kwargs.containsKey("index_params")
+ ? objectToMap(kwargs.get("index_params"))
+ : this.indexParams;
+
+ CreateCollectionReq.CollectionSchema schema = client().createSchema();
+ schema.setEnableDynamicField(false);
+ schema.addField(
+ AddFieldReq.builder()
+ .fieldName(this.idField)
+ .dataType(DataType.VarChar)
+ .isPrimaryKey(Boolean.TRUE)
+ .autoID(Boolean.FALSE)
+ .maxLength(intFromMap(kwargs, "id_max_length",
this.idMaxLength))
+ .build());
+ schema.addField(
+ AddFieldReq.builder()
+ .fieldName(this.contentField)
+ .dataType(DataType.VarChar)
+ .maxLength(intFromMap(kwargs, "content_max_length",
this.contentMaxLength))
+ .build());
+ schema.addField(
+ AddFieldReq.builder()
+ .fieldName(this.metadataField)
+ .dataType(DataType.JSON)
+ .isNullable(Boolean.TRUE)
+ .build());
+ schema.addField(
+ AddFieldReq.builder()
+ .fieldName(this.vectorField)
+ .dataType(DataType.FloatVector)
+ .dimension(dimension)
+ .build());
+
+ IndexParam vectorIndex =
+ IndexParam.builder()
+ .fieldName(this.vectorField)
+ .indexType(index)
+ .metricType(metric)
+ .extraParams(params)
+ .build();
+ List<IndexParam> indexParams = new ArrayList<>();
+ indexParams.add(vectorIndex);
+ indexParams.add(metadataJsonIndexParam());
+ Map<String, String> metadataCastTypes =
metadataIndexCastTypesFromArgs(kwargs);
+ for (String key : metadataIndexKeysFromArgs(kwargs,
metadataCastTypes)) {
+ indexParams.add(
+ metadataJsonPathIndexParam(
+ key,
+ metadataCastTypes.getOrDefault(key,
DEFAULT_METADATA_INDEX_CAST_TYPE)));
+ }
+
+ CreateCollectionReq.CreateCollectionReqBuilder builder =
+ CreateCollectionReq.builder()
+ .collectionName(name)
+ .collectionSchema(schema)
+ .consistencyLevel(this.consistencyLevel)
+ .numShards(numShards)
+ .indexParams(indexParams);
+ if (this.databaseName != null) {
+ builder.databaseName(this.databaseName);
+ }
+ client().createCollection(builder.build());
+ ensureCollectionLoaded(name, kwargs);
+ }
+
+ /** Deletes the Milvus collection with the given name. */
+ @Override
+ public void deleteCollection(String name) {
+ DropCollectionReq.DropCollectionReqBuilder builder =
+
DropCollectionReq.builder().collectionName(name).async(Boolean.FALSE);
+ if (this.databaseName != null) {
+ builder.databaseName(this.databaseName);
+ }
+ client().dropCollection(builder.build());
+ }
+
+ /**
+ * Retrieve documents from the vector store.
+ *
+ * <p>If {@code ids} are provided, this method queries by primary key and
ignores {@code
+ * filters} and {@code limit} per the {@link BaseVectorStore} contract.
Otherwise it queries
+ * with either the unified equality-only {@code filters} DSL or an
all-rows fallback expression.
+ *
+ * @param ids The ids of the documents.
+ * @param collection The name of the collection to retrieve from. If null,
retrieve from the
+ * default collection.
+ * @param filters Unified equality-only filter DSL matched against
metadata JSON fields.
+ * @param limit Maximum number of documents to return; falls back to {@link
+ * #DEFAULT_MAX_GET_LIMIT} when null.
+ * @param extraArgs Additional Milvus-specific arguments.
+ * @return List of documents retrieved.
+ */
+ @Override
+ public List<Document> get(
+ @Nullable List<String> ids,
+ @Nullable String collection,
+ @Nullable Map<String, Object> filters,
+ @Nullable Integer limit,
+ Map<String, Object> extraArgs)
+ throws IOException {
+ String targetCollection = resolveCollection(collection);
+
+ if (ids != null && !ids.isEmpty()) {
+ // Get specific documents by IDs; filters and limit are ignored per
+ // BaseVectorStore contract.
+ return getDocumentsByIds(targetCollection, ids);
+ }
+
+ // Get all documents with optional filters and limit.
+ return getDocuments(targetCollection, filtersToExpression(filters),
limit);
+ }
+
+ /**
+ * Retrieves documents by their IDs using Milvus query API.
+ *
+ * @param collection The collection to query
+ * @param ids List of document IDs to retrieve
+ * @return List of Documents
+ */
+ private List<Document> getDocumentsByIds(String collection, List<String>
ids) {
+ QueryReq.QueryReqBuilder builder =
+ QueryReq.builder()
+ .collectionName(collection)
+ .ids(toObjectIds(ids))
+ .outputFields(outputFields())
+ .consistencyLevel(this.consistencyLevel);
+ if (this.databaseName != null) {
+ builder.databaseName(this.databaseName);
+ }
+ QueryResp resp = client().query(builder.build());
+ return queryResultsToDocuments(resp.getQueryResults());
+ }
+
+ /**
+ * Retrieves documents using Milvus query API with optional filters and
limit.
+ *
+ * @param collection The collection to query
+ * @param filter Optional Milvus boolean expression
+ * @param limit Maximum number of documents to return
+ * @return List of Documents
+ */
+ private List<Document> getDocuments(
+ String collection, @Nullable String filter, @Nullable Integer
limit) {
+ QueryReq.QueryReqBuilder builder =
+ QueryReq.builder()
+ .collectionName(collection)
+ .outputFields(outputFields())
+ .filter(filter == null ? allRowsFilter() : filter)
+ .consistencyLevel(this.consistencyLevel)
+ .limit(limit == null ? this.maxGetLimit : limit);
+ if (this.databaseName != null) {
+ builder.databaseName(this.databaseName);
+ }
+ QueryResp resp = client().query(builder.build());
+ return queryResultsToDocuments(resp.getQueryResults());
+ }
+
+ /**
+ * Delete documents in the vector store.
+ *
+ * <p>If ids are provided, this method deletes the corresponding primary
keys. Otherwise it
+ * deletes documents matched by the unified equality-only {@code filters}
DSL. If no filter is
+ * provided, it deletes all documents in the target collection.
+ *
+ * @param ids The ids of the documents.
+ * @param collection The name of the collection the documents belong to.
If null, use the
+ * default collection.
+ * @param filters Unified equality-only filter DSL matched against
metadata JSON fields.
+ * @param extraArgs Additional Milvus-specific arguments.
+ */
+ @Override
+ public void delete(
+ @Nullable List<String> ids,
+ @Nullable String collection,
+ @Nullable Map<String, Object> filters,
+ Map<String, Object> extraArgs)
+ throws IOException {
+ String targetCollection = resolveCollection(collection);
+ if (ids != null && !ids.isEmpty()) {
+ // Delete specific documents by IDs.
+ deleteDocumentsByIds(targetCollection, ids);
+ } else {
+ // Delete all documents with optional filters.
+ deleteDocuments(targetCollection, filters);
+ }
+ }
+
+ /**
+ * Deletes documents by their IDs using Milvus delete API.
+ *
+ * @param collection The collection to delete from
+ * @param ids List of document IDs to delete
+ */
+ private void deleteDocumentsByIds(String collection, List<String> ids) {
+ DeleteReq.DeleteReqBuilder builder =
+
DeleteReq.builder().collectionName(collection).ids(toObjectIds(ids));
+ if (this.databaseName != null) {
+ builder.databaseName(this.databaseName);
+ }
+ client().delete(builder.build());
+ }
+
+ /**
+ * Deletes documents using Milvus delete API with optional filters.
+ *
+ * @param collection The collection to delete from
+ * @param filters Unified equality-only filter DSL matched against
metadata JSON fields
+ */
+ private void deleteDocuments(String collection, @Nullable Map<String,
Object> filters) {
+ String filter = filtersToExpression(filters);
+ DeleteReq.DeleteReqBuilder builder =
+ DeleteReq.builder()
+ .collectionName(collection)
+ .filter(filter == null ? allRowsFilter() : filter);
+ if (this.databaseName != null) {
+ builder.databaseName(this.databaseName);
+ }
+ client().delete(builder.build());
+ }
+
+ /**
+ * Executes a Milvus vector search using a pre-computed embedding.
+ *
+ * <p>The method searches the configured vector field and returns only
document id, content, and
+ * metadata as output fields. The returned {@link Document#getScore()}
value is populated from
+ * the Milvus search result score, not from an output field.
+ *
+ * @param embedding The embedding vector to search with
+ * @param limit Maximum number of nearest neighbors to return
+ * @param collection The collection to search. If null, search the default
collection.
+ * @param filters Unified equality-only filter DSL matched against
metadata JSON fields.
+ * @param args Additional arguments. Supported keys include {@code
metric_type} and {@code
+ * search_params}.
+ * @return A list of matching documents, possibly empty
+ */
+ @Override
+ public List<Document> queryEmbedding(
+ float[] embedding,
+ int limit,
+ @Nullable String collection,
+ @Nullable Map<String, Object> filters,
+ Map<String, Object> args) {
+ String targetCollection = resolveCollection(collection);
+ SearchReq.SearchReqBuilder builder =
+ SearchReq.builder()
+ .collectionName(targetCollection)
+ .annsField(this.vectorField)
+ .metricType(
+ enumFromMap(
+ IndexParam.MetricType.class,
+ args,
+ "metric_type",
+ this.metricType))
+ .data(Collections.<BaseVector>singletonList(new
FloatVec(embedding)))
+ .limit(limit)
+ .outputFields(outputFields())
+ .consistencyLevel(this.consistencyLevel);
+ String filter = filtersToExpression(filters);
+ if (filter != null) {
+ builder.filter(filter);
+ }
+ Map<String, Object> searchParams =
objectToMap(args.get("search_params"));
+ if (!searchParams.isEmpty()) {
+ builder.searchParams(searchParams);
+ }
+ if (this.databaseName != null) {
+ builder.databaseName(this.databaseName);
+ }
+
+ SearchResp resp = client().search(builder.build());
+ List<List<SearchResp.SearchResult>> groups = resp.getSearchResults();
+ if (groups == null || groups.isEmpty()) {
+ return Collections.emptyList();
+ }
+ return searchResultsToDocuments(groups.get(0));
+ }
+
+ /**
+ * Add documents with pre-computed embeddings to the vector store.
+ *
+ * <p>Documents without ids get generated UUIDs. Add always uses Milvus
insert; callers should
+ * use {@link #updateEmbedding(List, String, Map)} when they need to
replace existing documents.
+ * Each document must already contain an embedding; the public {@link
BaseVectorStore#add(List,
+ * String, Map)} path handles auto-embedding before it reaches this method.
+ *
+ * @return List of document ids written to Milvus
+ */
+ @Override
+ public List<String> addEmbedding(
+ List<Document> documents, @Nullable String collection, Map<String,
Object> extraArgs)
+ throws IOException {
+ if (documents == null || documents.isEmpty()) {
+ return Collections.emptyList();
+ }
+ String targetCollection = resolveCollection(collection);
+
+ List<JsonObject> rows = new ArrayList<>();
+ List<String> ids = new ArrayList<>();
+ for (Document doc : documents) {
+ String id = doc.getId();
+ if (id == null || id.isEmpty()) {
+ id = UUID.randomUUID().toString();
+ }
+ ids.add(id);
+ rows.add(toRow(id, doc));
+ }
+ insertRows(targetCollection, rows);
+ return ids;
+ }
+
+ /**
+ * Update documents with pre-computed embeddings.
+ *
+ * <p>Milvus upsert rewrites rows by primary key. The public {@link
BaseVectorStore#update(List,
+ * String, Map)} path already enforces that every document carries an id.
+ */
+ @Override
+ public void updateEmbedding(
+ List<Document> documents, @Nullable String collection, Map<String,
Object> extraArgs)
+ throws IOException {
+ String targetCollection = resolveCollection(collection);
+
+ List<JsonObject> rows = new ArrayList<>();
+ for (Document doc : documents) {
+ rows.add(toRow(Objects.requireNonNull(doc.getId()), doc));
+ }
+ upsertRows(targetCollection, rows);
+ }
+
+ /** Writes rows using Milvus insert. */
+ private void insertRows(String targetCollection, List<JsonObject> rows) {
+ InsertReq.InsertReqBuilder builder =
+
InsertReq.builder().collectionName(targetCollection).data(rows);
+ if (this.databaseName != null) {
+ builder.databaseName(this.databaseName);
+ }
+ client().insert(builder.build());
+ }
+
+ /** Writes rows using Milvus upsert, so repeated ids replace the existing
entity. */
+ private void upsertRows(String targetCollection, List<JsonObject> rows) {
+ UpsertReq.UpsertReqBuilder builder =
+ UpsertReq.builder()
+ .collectionName(targetCollection)
+ .data(rows)
+ .partialUpdate(false);
+ if (this.databaseName != null) {
+ builder.databaseName(this.databaseName);
+ }
+ client().upsert(builder.build());
+ }
+
+ /** Converts a {@link Document} into the row object expected by Milvus
insert/upsert APIs. */
+ private JsonObject toRow(String id, Document doc) {
+ if (doc.getEmbedding() == null) {
+ throw new IllegalArgumentException("Document embedding must not be
null.");
+ }
+ JsonObject row = new JsonObject();
+ row.addProperty(this.idField, id);
+ row.addProperty(this.contentField, doc.getContent());
+ row.add(this.metadataField, this.gson.toJsonTree(doc.getMetadata()));
+ row.add(this.vectorField,
this.gson.toJsonTree(toFloatList(doc.getEmbedding())));
+ return row;
+ }
+
+ /** Checks whether a collection exists in the configured Milvus database.
*/
+ private boolean hasCollection(String collectionName) {
+ HasCollectionReq.HasCollectionReqBuilder builder =
+ HasCollectionReq.builder().collectionName(collectionName);
+ if (this.databaseName != null) {
+ builder.databaseName(this.databaseName);
+ }
+ return client().hasCollection(builder.build());
+ }
+
+ /** Loads the collection when Milvus reports that it is not loaded yet. */
+ private void ensureCollectionLoaded(String collectionName, Map<String,
Object> extraArgs) {
+ GetLoadStateReq.GetLoadStateReqBuilder stateBuilder =
+ GetLoadStateReq.builder().collectionName(collectionName);
+ if (this.databaseName != null) {
+ stateBuilder.databaseName(this.databaseName);
+ }
+ if (Boolean.TRUE.equals(client().getLoadState(stateBuilder.build()))) {
+ return;
+ }
+
+ LoadCollectionReq.LoadCollectionReqBuilder loadBuilder =
+ LoadCollectionReq.builder()
+ .collectionName(collectionName)
+ .sync(Boolean.TRUE)
+ .timeout(longFromMap(extraArgs, "load_timeout_ms",
this.loadTimeoutMs));
+ if (this.databaseName != null) {
+ loadBuilder.databaseName(this.databaseName);
+ }
+ client().loadCollection(loadBuilder.build());
+ }
+
+ /**
+ * Creates an index on the full metadata JSON object.
+ *
+ * <p>Unlike per-key JSON path indexes, this uses {@code
json_path=metadata} and {@code
+ * json_cast_type=JSON}. It is the closest Milvus equivalent to
Elasticsearch's dynamic metadata
+ * object mapping: upper layers still pass unified filters by logical
metadata key, and {@link
+ * #filtersToExpression(Map)} expands them to {@code metadata["key"]}
predicates.
+ */
+ private IndexParam metadataJsonIndexParam() {
+ return IndexParam.builder()
+ .fieldName(this.metadataField)
+ .indexName(DEFAULT_METADATA_INDEX_NAME)
+ .indexType(IndexParam.IndexType.AUTOINDEX)
+ .extraParams(Map.of("json_path", this.metadataField,
"json_cast_type", "JSON"))
+ .build();
+ }
+
+ /**
+ * Creates an index on a high-value metadata JSON key.
+ *
+ * <p>Upper layers pass filters as logical keys such as {@code user_id};
this method maps each
+ * key to the Milvus path expression {@code metadata["user_id"]}.
String-like keys use {@code
+ * VARCHAR} by default, while callers can override cast types for numeric
or boolean metadata
+ * keys with {@code metadata_index_cast_types}.
+ */
+ private IndexParam metadataJsonPathIndexParam(String key, String castType)
{
+ return IndexParam.builder()
+ .fieldName(this.metadataField)
+ .indexName(metadataJsonPathIndexName(key))
+ .indexType(IndexParam.IndexType.AUTOINDEX)
+ .extraParams(
+ Map.of(
+ "json_path",
+ this.metadataField + "[\"" + key + "\"]",
+ "json_cast_type",
+ castType))
+ .build();
+ }
+
+ /** Returns the deterministic index name used for a metadata JSON path
index. */
+ private String metadataJsonPathIndexName(String key) {
+ return this.metadataField + "_" + key + "_json_index";
+ }
+
+ /**
+ * Resolves the effective collection name.
+ *
+ * <p>Precedence is: explicit method argument, then the descriptor
default. The descriptor
+ * accepts the {@code index} alias to stay compatible with
Elasticsearch-style configuration,
+ * but per-call target collection selection should use the dedicated
method parameter.
+ */
+ private String resolveCollection(@Nullable String collectionName) {
+ if (collectionName != null) {
+ return collectionName;
+ }
+ return this.defaultCollection;
+ }
+
+ /**
+ * Output fields requested from Milvus for retrieval/search results.
+ *
+ * <p>The vector field is intentionally omitted: upper layers only require
id, content,
+ * metadata, and the search score supplied separately by Milvus search
results.
+ */
+ private List<String> outputFields() {
+ return List.of(this.idField, this.contentField, this.metadataField);
+ }
+
+ /** Converts Milvus query/get results into Flink Agents documents. */
+ private List<Document> queryResultsToDocuments(@Nullable
List<QueryResp.QueryResult> results) {
+ if (results == null || results.isEmpty()) {
+ return Collections.emptyList();
+ }
+ List<Document> docs = new ArrayList<>();
+ for (QueryResp.QueryResult result : results) {
+ docs.add(entityToDocument(result.getEntity(), null, null));
+ }
+ return docs;
+ }
+
+ /** Converts Milvus search results into Flink Agents documents and
preserves hit scores. */
+ private List<Document>
searchResultsToDocuments(List<SearchResp.SearchResult> results) {
+ List<Document> docs = new ArrayList<>();
+ for (SearchResp.SearchResult result : results) {
+ docs.add(entityToDocument(result.getEntity(), result.getId(),
result.getScore()));
+ }
+ return docs;
+ }
+
+ /**
+ * Converts a Milvus entity map into a {@link Document}.
+ *
+ * <p>Embeddings are not reconstructed from result rows because the vector
field is not
+ * requested in {@link #outputFields()}. Search scores are carried
separately from Milvus search
+ * results.
+ */
+ private Document entityToDocument(
+ Map<String, Object> entity, @Nullable Object resultId, @Nullable
Float score) {
+ Object idValue = resultId == null ? entity.get(this.idField) :
resultId;
+ String id = idValue == null ? null : String.valueOf(idValue);
+ Object contentValue = entity.get(this.contentField);
+ String content = contentValue == null ? "" :
String.valueOf(contentValue);
+ Map<String, Object> metadata =
objectToMap(entity.get(this.metadataField));
+ return new Document(content, metadata, id, null, score);
+ }
+
+ /** Converts known map-like values returned by the Milvus SDK into a Java
map. */
+ @SuppressWarnings("unchecked")
+ private Map<String, Object> objectToMap(@Nullable Object value) {
+ if (value == null) {
+ return Collections.emptyMap();
+ }
+ if (value instanceof Map) {
+ return new LinkedHashMap<>((Map<String, Object>) value);
+ }
+ if (value instanceof JsonObject) {
+ return this.gson.fromJson((JsonObject) value, Map.class);
+ }
+ if (value instanceof JsonElement) {
+ return this.gson.fromJson((JsonElement) value, Map.class);
+ }
+ return Collections.emptyMap();
+ }
+
+ /** Resolves metadata JSON path index cast types from descriptor defaults
plus per-call args. */
+ private Map<String, String> metadataIndexCastTypesFromArgs(Map<String,
Object> args) {
+ Map<String, String> castTypes = new
LinkedHashMap<>(this.metadataIndexCastTypes);
+ if (args.containsKey("metadata_index_cast_types")) {
+ putMetadataIndexCastTypes(
+ castTypes,
objectToMap(args.get("metadata_index_cast_types")));
+ }
+ return castTypes;
+ }
+
+ /** Resolves metadata keys to index from defaults plus descriptor and
per-call args. */
+ private List<String> metadataIndexKeysFromArgs(
+ Map<String, Object> args, Map<String, String> castTypes) {
+ LinkedHashMap<String, Boolean> keys = new LinkedHashMap<>();
+ putMetadataIndexKeys(keys, this.metadataIndexKeys);
+ putMetadataIndexKeys(keys,
stringList(args.get("metadata_index_keys")));
+ putMetadataIndexKeys(keys, castTypes.keySet());
+ return new ArrayList<>(keys.keySet());
+ }
+
+ /**
+ * Translates the unified equality-only filter DSL into a Milvus boolean
expression.
+ *
+ * <p>Metadata is stored in a JSON field, so equality filters are
translated into JSON subscript
+ * predicates such as {@code metadata["user_id"] == "alice"}.
+ */
+ private String filtersToExpression(@Nullable Map<String, Object> filters) {
+ if (filters == null || filters.isEmpty()) {
+ return null;
+ }
+ List<String> clauses = new ArrayList<>();
+ for (Map.Entry<String, Object> entry : filters.entrySet()) {
+ Object value = entry.getValue();
+ if (value == null || value instanceof Map) {
+ throw new UnsupportedOperationException(
+ "MilvusVectorStore filters support equality shorthand
only.");
+ }
+ clauses.add(
+ this.metadataField
+ + "[\""
+ + escapeString(entry.getKey())
+ + "\"] == "
+ + literal(value));
+ }
+ return String.join(" and ", clauses);
+ }
+
+ /** Returns a Milvus expression that matches every row with a non-empty
primary key. */
+ private String allRowsFilter() {
+ return this.idField + " != \"\"";
+ }
+
+ /** Formats a Java value as a Milvus expression literal. */
+ private String literal(Object value) {
+ if (value instanceof Number || value instanceof Boolean) {
+ return String.valueOf(value);
+ }
+ return "\"" + escapeString(String.valueOf(value)) + "\"";
+ }
+
+ /** Converts Java primitive float arrays into boxed lists accepted by the
Milvus SDK. */
+ private static List<Float> toFloatList(float[] embedding) {
+ List<Float> vector = new ArrayList<>(embedding.length);
+ for (float value : embedding) {
+ vector.add(value);
+ }
+ return vector;
+ }
+
+ /** Converts string ids into the object list shape expected by Milvus
get/delete requests. */
+ private static List<Object> toObjectIds(List<String> ids) {
+ return new ArrayList<>(ids);
+ }
+
+ /** Resolves the Milvus endpoint from {@code uri}, {@code host}, and
{@code port}. */
+ private static String resolveUri(ResourceDescriptor descriptor) {
+ String uri = stringArg(descriptor, "uri", null);
+ if (uri != null && !uri.isEmpty()) {
+ return uri;
+ }
+ String host = stringArg(descriptor, "host", "localhost");
+ int port = intArg(descriptor, "port", 19530);
+ if (host.startsWith("http://") || host.startsWith("https://")) {
+ return host;
+ }
+ if (host.contains(":")) {
+ return "http://" + host;
+ }
+ return "http://" + host + ":" + port;
+ }
+
+ /** Reads a descriptor argument as a map, returning an empty map when
absent. */
+ @SuppressWarnings("unchecked")
+ private static Map<String, Object> mapArg(ResourceDescriptor descriptor,
String key) {
+ Object value = descriptor.getArgument(key);
+ if (value instanceof Map) {
+ return new HashMap<>((Map<String, Object>) value);
+ }
+ return Collections.emptyMap();
+ }
+
+ /** Reads metadata JSON path index keys from the descriptor. */
+ private static List<String> metadataIndexKeysArg(
+ ResourceDescriptor descriptor, Map<String, String> castTypes) {
+ LinkedHashMap<String, Boolean> keys = new LinkedHashMap<>();
+ putMetadataIndexKeys(keys, DEFAULT_METADATA_INDEX_KEYS);
+ putMetadataIndexKeys(keys,
stringList(descriptor.getArgument("metadata_index_keys")));
+ putMetadataIndexKeys(keys, castTypes.keySet());
+ return new ArrayList<>(keys.keySet());
+ }
+
+ /** Reads metadata JSON path index cast types from the descriptor. */
+ private static Map<String, String>
metadataIndexCastTypesArg(ResourceDescriptor descriptor) {
+ Map<String, String> castTypes = new LinkedHashMap<>();
+ for (String key : DEFAULT_METADATA_INDEX_KEYS) {
+ castTypes.put(key, DEFAULT_METADATA_INDEX_CAST_TYPE);
+ }
+ putMetadataIndexCastTypes(castTypes, mapArg(descriptor,
"metadata_index_cast_types"));
+ return castTypes;
+ }
+
+ /** Adds metadata index keys while preserving insertion order and removing
duplicates. */
+ private static void putMetadataIndexKeys(
+ LinkedHashMap<String, Boolean> target, Iterable<String> keys) {
+ for (String key : keys) {
+ target.put(normalizeMetadataIndexKey(key), Boolean.TRUE);
+ }
+ }
+
+ /** Adds or overrides metadata index cast types. */
+ private static void putMetadataIndexCastTypes(
+ Map<String, String> target, Map<String, Object> castTypes) {
+ for (Map.Entry<String, Object> entry : castTypes.entrySet()) {
+ target.put(
+ normalizeMetadataIndexKey(entry.getKey()),
+ normalizeMetadataIndexCastType(entry.getValue()));
+ }
+ }
+
+ /** Parses a descriptor or per-call argument as a string list. */
+ private static List<String> stringList(@Nullable Object value) {
+ if (value == null) {
+ return Collections.emptyList();
+ }
+ if (value instanceof Iterable) {
+ List<String> result = new ArrayList<>();
+ for (Object item : (Iterable<?>) value) {
+ if (item != null) {
+ result.add(String.valueOf(item));
+ }
+ }
+ return result;
+ }
+ if (value instanceof Object[]) {
+ List<String> result = new ArrayList<>();
+ for (Object item : (Object[]) value) {
+ if (item != null) {
+ result.add(String.valueOf(item));
+ }
+ }
+ return result;
+ }
+ String text = String.valueOf(value).trim();
+ if (text.isEmpty()) {
+ return Collections.emptyList();
+ }
+ List<String> result = new ArrayList<>();
+ for (String part : text.split(",")) {
+ String trimmed = part.trim();
+ if (!trimmed.isEmpty()) {
+ result.add(trimmed);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Validates a top-level JSON key used for a generated Milvus path index.
+ *
+ * <p>Milvus recommends JSON keys made from letters, digits, and
underscores. Keeping the
+ * configurable index list to that subset also gives deterministic, valid
index names.
+ */
+ private static String normalizeMetadataIndexKey(String key) {
+ String trimmed = key == null ? "" : key.trim();
+ if (trimmed.isEmpty()) {
+ throw new IllegalArgumentException("metadata_index_keys cannot
contain empty keys.");
+ }
+ if (!isIdentifierStart(trimmed.charAt(0))) {
+ throw new IllegalArgumentException(
+ "metadata_index_keys must contain only JSON-safe
identifiers: " + key);
+ }
+ for (int i = 1; i < trimmed.length(); i++) {
+ if (!isIdentifierPart(trimmed.charAt(i))) {
+ throw new IllegalArgumentException(
+ "metadata_index_keys must contain only JSON-safe
identifiers: " + key);
+ }
+ }
+ return trimmed;
+ }
+
+ /** Normalizes Milvus JSON index cast types. */
+ private static String normalizeMetadataIndexCastType(@Nullable Object
value) {
+ String type =
+ value == null
+ ? DEFAULT_METADATA_INDEX_CAST_TYPE
+ :
String.valueOf(value).trim().toUpperCase(Locale.ROOT);
+ switch (type) {
+ case "BOOL":
+ case "DOUBLE":
+ case "VARCHAR":
+ case "ARRAY_BOOL":
+ case "ARRAY_DOUBLE":
+ case "ARRAY_VARCHAR":
+ case "JSON":
+ return type;
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported Milvus metadata JSON index cast type: " +
value);
+ }
+ }
+
+ private static boolean isIdentifierStart(char value) {
+ return (value >= 'A' && value <= 'Z') || (value >= 'a' && value <=
'z') || value == '_';
+ }
+
+ private static boolean isIdentifierPart(char value) {
+ return isIdentifierStart(value) || (value >= '0' && value <= '9');
+ }
+
+ /** Reads a descriptor argument as a string, returning the supplied
default when absent. */
+ private static String stringArg(
+ ResourceDescriptor descriptor, String key, @Nullable String
defaultValue) {
+ Object value = descriptor.getArgument(key);
+ return value == null ? defaultValue : String.valueOf(value);
+ }
+
+ /** Reads a descriptor argument as an integer, accepting both numeric and
string values. */
+ private static int intArg(ResourceDescriptor descriptor, String key, int
defaultValue) {
+ Object value = descriptor.getArgument(key);
+ if (value instanceof Number) {
+ return ((Number) value).intValue();
+ }
+ if (value != null) {
+ return Integer.parseInt(String.valueOf(value));
+ }
+ return defaultValue;
+ }
+
+ /** Reads a descriptor argument as a long, accepting both numeric and
string values. */
+ private static long longArg(ResourceDescriptor descriptor, String key,
long defaultValue) {
+ Object value = descriptor.getArgument(key);
+ if (value instanceof Number) {
+ return ((Number) value).longValue();
+ }
+ if (value != null) {
+ return Long.parseLong(String.valueOf(value));
+ }
+ return defaultValue;
+ }
+
+ /** Reads a per-call argument as an integer, accepting both numeric and
string values. */
+ private static int intFromMap(Map<String, Object> args, String key, int
defaultValue) {
+ Object value = args.get(key);
+ if (value instanceof Number) {
+ return ((Number) value).intValue();
+ }
+ if (value != null) {
+ return Integer.parseInt(String.valueOf(value));
+ }
+ return defaultValue;
+ }
+
+ /** Reads a per-call argument as a long, accepting both numeric and string
values. */
+ private static long longFromMap(Map<String, Object> args, String key, long
defaultValue) {
+ Object value = args.get(key);
+ if (value instanceof Number) {
+ return ((Number) value).longValue();
+ }
+ if (value != null) {
+ return Long.parseLong(String.valueOf(value));
+ }
+ return defaultValue;
+ }
+
+ /** Reads a descriptor argument as a boolean. */
+ private static boolean booleanArg(
+ ResourceDescriptor descriptor, String key, boolean defaultValue) {
+ Object value = descriptor.getArgument(key);
+ if (value instanceof Boolean) {
+ return (Boolean) value;
+ }
+ if (value != null) {
+ return Boolean.parseBoolean(String.valueOf(value));
+ }
+ return defaultValue;
+ }
+
+ /** Reads a per-call enum argument, returning the default when absent. */
+ private static <E extends Enum<E>> E enumFromMap(
+ Class<E> enumClass, Map<String, Object> args, String key, E
defaultValue) {
+ Object value = args.get(key);
+ return value == null ? defaultValue : enumArg(enumClass,
String.valueOf(value));
+ }
+
+ /** Parses enum names case-insensitively and accepts dash-separated names.
*/
+ private static <E extends Enum<E>> E enumArg(Class<E> enumClass, String
value) {
+ return Enum.valueOf(enumClass, value.trim().replace('-',
'_').toUpperCase(Locale.ROOT));
+ }
+
+ /** Escapes string content embedded in Milvus expression literals. */
+ private static String escapeString(String value) {
+ return value.replace("\\", "\\\\").replace("\"", "\\\"");
+ }
+}
diff --git
a/integrations/vector-stores/milvus/src/test/java/org/apache/flink/agents/integrations/vectorstores/milvus/MilvusVectorStoreTest.java
b/integrations/vector-stores/milvus/src/test/java/org/apache/flink/agents/integrations/vectorstores/milvus/MilvusVectorStoreTest.java
new file mode 100644
index 00000000..bfd44ceb
--- /dev/null
+++
b/integrations/vector-stores/milvus/src/test/java/org/apache/flink/agents/integrations/vectorstores/milvus/MilvusVectorStoreTest.java
@@ -0,0 +1,642 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.integrations.vectorstores.milvus;
+
+import io.milvus.v2.client.ConnectConfig;
+import io.milvus.v2.client.MilvusClientV2;
+import io.milvus.v2.common.IndexParam;
+import io.milvus.v2.service.collection.request.DescribeCollectionReq;
+import io.milvus.v2.service.collection.request.GetLoadStateReq;
+import io.milvus.v2.service.collection.request.ReleaseCollectionReq;
+import io.milvus.v2.service.collection.response.DescribeCollectionResp;
+import io.milvus.v2.service.index.request.DescribeIndexReq;
+import io.milvus.v2.service.index.request.ListIndexesReq;
+import io.milvus.v2.service.index.response.DescribeIndexResp;
+import org.apache.flink.agents.api.embedding.model.BaseEmbeddingModelSetup;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceContext;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.vectorstores.BaseVectorStore;
+import
org.apache.flink.agents.api.vectorstores.CollectionManageableVectorStore;
+import org.apache.flink.agents.api.vectorstores.Document;
+import org.apache.flink.agents.api.vectorstores.VectorStoreQuery;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link MilvusVectorStore}. */
+public class MilvusVectorStoreTest {
+
+ private static final ResourceContext NOOP =
ResourceContext.fromGetResource((a, b) -> null);
+
+ @Test
+ void testConstructorAndStoreKwargs() {
+ ResourceDescriptor desc =
+
ResourceDescriptor.Builder.newBuilder(MilvusVectorStore.class.getName())
+ .addInitialArgument("embedding_model",
"embeddingModel")
+ .addInitialArgument("uri", "http://localhost:19530")
+ .addInitialArgument("collection", "test_collection")
+ .addInitialArgument("dims", 5)
+ .addInitialArgument("num_shards", 3)
+ .addInitialArgument("metric_type", "IP")
+ .addInitialArgument("index_type", "IVF_FLAT")
+ .addInitialArgument("metadata_index_keys",
List.of("source"))
+ .addInitialArgument("metadata_index_cast_types",
Map.of("score", "DOUBLE"))
+ // Test-only custom consistency value to verify
descriptor plumbing.
+ // Production should use the default BOUNDED
consistency unless immediate
+ // read-after-write visibility is required.
+ .addInitialArgument("consistency_level", "STRONG")
+ .addInitialArgument("load_timeout_ms", 12345L)
+ .build();
+
+ MilvusVectorStore store = new MilvusVectorStore(desc, NOOP);
+ Map<String, Object> kwargs = store.getStoreKwargs();
+ assertThat(store).isInstanceOf(BaseVectorStore.class);
+ assertThat(store).isInstanceOf(CollectionManageableVectorStore.class);
+ assertThat(kwargs).containsEntry("collection", "test_collection");
+ assertThat(kwargs).containsEntry("index", "test_collection");
+ assertThat(kwargs).containsEntry("dims", 5);
+ assertThat(kwargs).containsEntry("num_shards", 3);
+ assertThat(kwargs).containsEntry("metric_type", "IP");
+ assertThat(kwargs).containsEntry("index_type", "IVF_FLAT");
+ assertThat((List<String>) kwargs.get("metadata_index_keys"))
+ .containsExactly(
+ "user_id", "agent_id", "run_id", "actor_id",
"category", "source", "score");
+ assertThat((Map<String, String>)
kwargs.get("metadata_index_cast_types"))
+ .containsEntry("user_id", "VARCHAR")
+ .containsEntry("agent_id", "VARCHAR")
+ .containsEntry("run_id", "VARCHAR")
+ .containsEntry("actor_id", "VARCHAR")
+ .containsEntry("category", "VARCHAR")
+ .containsEntry("score", "DOUBLE");
+ assertThat(kwargs).containsEntry("consistency_level", "STRONG");
+ assertThat(kwargs).containsEntry("load_timeout_ms", 12345L);
+ assertThat(kwargs).doesNotContainKey("flush_on_write");
+ assertThat(kwargs).doesNotContainKey("auto_create_collection");
+ assertThat(kwargs).doesNotContainKey("metadata_index_enabled");
+ store.close();
+ }
+
+ @Test
+ @EnabledIfEnvironmentVariable(named = "MILVUS_URI", matches = ".+")
+ void testCreateCollectionUsesShardCount() throws Exception {
+ String collection = collectionName("shard_count");
+ MilvusVectorStore store = openStore(collection);
+ MilvusClientV2 client =
+ new MilvusClientV2(
+
ConnectConfig.builder().uri(System.getenv("MILVUS_URI")).build());
+ try {
+ store.createCollectionIfNotExists(collection, Map.of("num_shards",
2));
+
+ DescribeCollectionResp resp =
+ client.describeCollection(
+
DescribeCollectionReq.builder().collectionName(collection).build());
+ Assertions.assertEquals(2, resp.getShardsNum());
+ } finally {
+ store.deleteCollection(collection);
+ store.close();
+ client.close();
+ }
+ }
+
+ @Test
+ @EnabledIfEnvironmentVariable(named = "MILVUS_URI", matches = ".+")
+ void testCreateCollectionUsesMetadataJsonIndex() throws Exception {
+ String collection = collectionName("metadata_json_index");
+ ResourceDescriptor desc =
+
ResourceDescriptor.Builder.newBuilder(MilvusVectorStore.class.getName())
+ .addInitialArgument("embedding_model",
"embeddingModel")
+ .addInitialArgument("uri", System.getenv("MILVUS_URI"))
+ .addInitialArgument("collection", collection)
+ .addInitialArgument("dims", 5)
+ .addInitialArgument("index_type", "AUTOINDEX")
+ .addInitialArgument("metric_type", "COSINE")
+ .addInitialArgument("metadata_index_keys",
List.of("source"))
+ .addInitialArgument("metadata_index_cast_types",
Map.of("score", "DOUBLE"))
+ .build();
+ MilvusVectorStore store =
+ new MilvusVectorStore(
+ desc,
ResourceContext.fromGetResource(MilvusVectorStoreTest::getResource));
+ store.open();
+ MilvusClientV2 client =
+ new MilvusClientV2(
+
ConnectConfig.builder().uri(System.getenv("MILVUS_URI")).build());
+ try {
+ store.createCollectionIfNotExists(collection, Map.of());
+
+ List<String> indexNames =
+ client.listIndexes(
+ ListIndexesReq.builder()
+ .collectionName(collection)
+
.fieldName(MilvusVectorStore.DEFAULT_METADATA_FIELD)
+ .build());
+
assertThat(indexNames).contains(MilvusVectorStore.DEFAULT_METADATA_INDEX_NAME);
+ assertThat(indexNames)
+ .contains(
+ metadataPathIndexName("user_id"),
+ metadataPathIndexName("agent_id"),
+ metadataPathIndexName("run_id"),
+ metadataPathIndexName("actor_id"),
+ metadataPathIndexName("category"),
+ metadataPathIndexName("source"),
+ metadataPathIndexName("score"));
+
+ DescribeIndexResp resp =
+ client.describeIndex(
+ DescribeIndexReq.builder()
+ .collectionName(collection)
+
.indexName(MilvusVectorStore.DEFAULT_METADATA_INDEX_NAME)
+ .build());
+ DescribeIndexResp.IndexDesc scoreIndex =
+
resp.getIndexDescByIndexName(MilvusVectorStore.DEFAULT_METADATA_INDEX_NAME);
+ Assertions.assertEquals(
+ MilvusVectorStore.DEFAULT_METADATA_FIELD,
scoreIndex.getFieldName());
+ Assertions.assertEquals(IndexParam.IndexType.AUTOINDEX,
scoreIndex.getIndexType());
+ assertThat(scoreIndex.getExtraParams())
+ .containsEntry("json_path",
MilvusVectorStore.DEFAULT_METADATA_FIELD)
+ .containsEntry("json_cast_type", "JSON");
+
+ assertMetadataPathIndex(client, collection, "user_id", "VARCHAR");
+ assertMetadataPathIndex(client, collection, "actor_id", "VARCHAR");
+ assertMetadataPathIndex(client, collection, "category", "VARCHAR");
+ assertMetadataPathIndex(client, collection, "source", "VARCHAR");
+ assertMetadataPathIndex(client, collection, "score", "DOUBLE");
+ } finally {
+ store.deleteCollection(collection);
+ store.close();
+ client.close();
+ }
+ }
+
+ @Test
+ @EnabledIfEnvironmentVariable(named = "MILVUS_URI", matches = ".+")
+ void testCollectionManagement() throws Exception {
+ String collection = collectionName("collection_management");
+ MilvusVectorStore store = openStore(collection);
+
+ try {
+ createCollection(store, collection);
+ Assertions.assertTrue(
+ store.get(null, collection, null, 10,
Collections.emptyMap()).isEmpty());
+
+ store.deleteCollection(collection);
+
+ Assertions.assertThrows(
+ Exception.class,
+ () -> store.get(null, collection, null, 10,
Collections.emptyMap()));
+ } finally {
+ store.close();
+ }
+ }
+
+ @Test
+ @EnabledIfEnvironmentVariable(named = "MILVUS_URI", matches = ".+")
+ void testCreateCollectionIfNotExistsLoadsReleasedCollection() throws
Exception {
+ String collection = collectionName("load_released");
+ MilvusVectorStore store = openStore(collection);
+ MilvusClientV2 client =
+ new MilvusClientV2(
+
ConnectConfig.builder().uri(System.getenv("MILVUS_URI")).build());
+
+ try {
+ createCollection(store, collection);
+ store.add(
+ List.of(
+ new Document(
+ "Milvus is a vector database",
+ Map.of("category", "database"),
+ "doc1")),
+ collection,
+ Collections.emptyMap());
+
+ client.releaseCollection(
+ ReleaseCollectionReq.builder()
+ .collectionName(collection)
+ .async(Boolean.FALSE)
+ .build());
+ Assertions.assertFalse(
+ client.getLoadState(
+
GetLoadStateReq.builder().collectionName(collection).build()));
+
+ createCollection(store, collection);
+ Assertions.assertTrue(
+ client.getLoadState(
+
GetLoadStateReq.builder().collectionName(collection).build()));
+
+ List<Document> loaded =
+ store.get(List.of("doc1"), collection, null, null,
Collections.emptyMap());
+ Assertions.assertEquals(1, loaded.size());
+ assertDocument(
+ loaded.get(0),
+ "doc1",
+ "Milvus is a vector database",
+ Map.of("category", "database"));
+ } finally {
+ store.deleteCollection(collection);
+ store.close();
+ client.close();
+ }
+ }
+
+ @Test
+ @EnabledIfEnvironmentVariable(named = "MILVUS_URI", matches = ".+")
+ void testDocumentManagement() throws Exception {
+ String collection = collectionName("document_management");
+ MilvusVectorStore store = openStore(collection);
+
+ try {
+ createCollection(store, collection);
+
+ store.add(
+ List.of(
+ new Document(
+ "Milvus is a vector database",
+ Map.of("category", "database", "source",
"test"),
+ "doc1"),
+ new Document(
+ "Apache Flink Agents is an AI framework",
+ Map.of("category", "ai-agent", "source",
"test"),
+ "doc2")),
+ collection,
+ Collections.emptyMap());
+
+ List<Document> all = store.get(null, collection, null, 10,
Collections.emptyMap());
+ Assertions.assertEquals(2, all.size());
+ assertDocument(
+ documentById(all, "doc1"),
+ "doc1",
+ "Milvus is a vector database",
+ Map.of("category", "database", "source", "test"));
+ assertDocument(
+ documentById(all, "doc2"),
+ "doc2",
+ "Apache Flink Agents is an AI framework",
+ Map.of("category", "ai-agent", "source", "test"));
+
+ List<Document> byId =
+ store.get(List.of("doc1"), collection, null, null,
Collections.emptyMap());
+ Assertions.assertEquals(1, byId.size());
+ assertDocument(
+ byId.get(0),
+ "doc1",
+ "Milvus is a vector database",
+ Map.of("category", "database", "source", "test"));
+
+ store.delete(List.of("doc1"), collection, null,
Collections.emptyMap());
+ List<Document> remaining =
+ store.get(null, collection, null, 10,
Collections.emptyMap());
+ Assertions.assertEquals(1, remaining.size());
+ assertDocument(
+ remaining.get(0),
+ "doc2",
+ "Apache Flink Agents is an AI framework",
+ Map.of("category", "ai-agent", "source", "test"));
+
+ store.delete(null, collection, null, Collections.emptyMap());
+ Assertions.assertTrue(
+ store.get(null, collection, null, 10,
Collections.emptyMap()).isEmpty());
+ } finally {
+ store.deleteCollection(collection);
+ store.close();
+ }
+ }
+
+ @Test
+ @EnabledIfEnvironmentVariable(named = "MILVUS_URI", matches = ".+")
+ void testFiltersDsl() throws Exception {
+ String collection = collectionName("filters_dsl");
+ MilvusVectorStore store = openStore(collection);
+
+ try {
+ createCollection(store, collection);
+
+ store.add(
+ List.of(
+ new Document(
+ "Milvus is a vector database",
+ Map.of("category", "database", "user_id",
"alice"),
+ "doc_alice"),
+ new Document(
+ "Apache Flink Agents is an AI framework",
+ Map.of("category", "ai-agent", "user_id",
"bob"),
+ "doc_bob")),
+ collection,
+ Collections.emptyMap());
+
+ List<Document> aliceOnly =
+ store.get(
+ null,
+ collection,
+ Map.of("user_id", "alice"),
+ 10,
+ Collections.emptyMap());
+ Assertions.assertEquals(1, aliceOnly.size());
+ Assertions.assertEquals("doc_alice", aliceOnly.get(0).getId());
+
+ List<Document> aliceQueried =
+ store.queryEmbedding(
+ new float[] {1.0f, 0.0f, 0.0f, 0.0f, 0.0f},
+ 5,
+ collection,
+ Map.of("user_id", "alice"),
+ Collections.emptyMap());
+ Assertions.assertFalse(aliceQueried.isEmpty());
+ Assertions.assertTrue(
+ aliceQueried.stream().allMatch(d ->
"doc_alice".equals(d.getId())));
+ } finally {
+ store.deleteCollection(collection);
+ store.close();
+ }
+ }
+
+ @Test
+ @EnabledIfEnvironmentVariable(named = "MILVUS_URI", matches = ".+")
+ void testAddGeneratesIdsForDocumentsWithoutIds() throws Exception {
+ String collection = collectionName("generated_ids");
+ MilvusVectorStore store = openStore(collection);
+
+ try {
+ createCollection(store, collection);
+
+ List<String> ids =
+ store.add(
+ List.of(
+ new Document(
+ "Milvus is a vector database",
+ Map.of("category", "database"),
+ null)),
+ collection,
+ Collections.emptyMap());
+
+ Assertions.assertEquals(1, ids.size());
+ Assertions.assertNotNull(ids.get(0));
+ Assertions.assertFalse(ids.get(0).isEmpty());
+
+ List<Document> stored =
+ store.get(List.of(ids.get(0)), collection, null, null,
Collections.emptyMap());
+ Assertions.assertEquals(1, stored.size());
+ assertDocument(
+ stored.get(0),
+ ids.get(0),
+ "Milvus is a vector database",
+ Map.of("category", "database"));
+ } finally {
+ store.deleteCollection(collection);
+ store.close();
+ }
+ }
+
+ @Test
+ @EnabledIfEnvironmentVariable(named = "MILVUS_URI", matches = ".+")
+ void testAddPreservesCallerProvidedId() throws Exception {
+ String collection = collectionName("add_with_id");
+ MilvusVectorStore store = openStore(collection);
+
+ try {
+ createCollection(store, collection);
+
+ Document document =
+ new Document(
+ "Milvus is a vector database", Map.of("category",
"database"), "doc1");
+ List<String> ids = store.add(List.of(document), collection,
Collections.emptyMap());
+ Assertions.assertEquals(List.of("doc1"), ids);
+
+ List<Document> stored =
+ store.get(List.of("doc1"), collection, null, null,
Collections.emptyMap());
+ Assertions.assertEquals(1, stored.size());
+ assertDocument(
+ stored.get(0),
+ "doc1",
+ "Milvus is a vector database",
+ Map.of("category", "database"));
+ } finally {
+ store.deleteCollection(collection);
+ store.close();
+ }
+ }
+
+ @Test
+ @EnabledIfEnvironmentVariable(named = "MILVUS_URI", matches = ".+")
+ void testExtraArgsCollectionDoesNotOverrideTargetCollection() throws
Exception {
+ String collection = collectionName("target_collection");
+ String ignoredCollection = collectionName("ignored_collection");
+ MilvusVectorStore store = openStore(collection);
+
+ try {
+ createCollection(store, collection);
+
+ List<String> ids =
+ store.add(
+ List.of(
+ new Document(
+ "Milvus is a vector database",
+ Map.of("category", "database"),
+ "doc1")),
+ null,
+ Map.of("collection", ignoredCollection));
+ Assertions.assertEquals(List.of("doc1"), ids);
+
+ List<Document> stored =
+ store.get(List.of("doc1"), collection, null, null,
Collections.emptyMap());
+ Assertions.assertEquals(1, stored.size());
+ assertDocument(
+ stored.get(0),
+ "doc1",
+ "Milvus is a vector database",
+ Map.of("category", "database"));
+ } finally {
+ dropCollectionQuietly(store, collection);
+ dropCollectionQuietly(store, ignoredCollection);
+ store.close();
+ }
+ }
+
+ @Test
+ @EnabledIfEnvironmentVariable(named = "MILVUS_URI", matches = ".+")
+ void testUpdateOverwritesExistingDocument() throws Exception {
+ String collection = collectionName("update_overwrite");
+ MilvusVectorStore store = openStore(collection);
+
+ try {
+ createCollection(store, collection);
+
+ Document original =
+ new Document(
+ "Milvus is a vector database", Map.of("category",
"database"), "doc1");
+ store.add(List.of(original), collection, Collections.emptyMap());
+
+ Document rewritten =
+ new Document(
+ "Milvus stores dense vectors", Map.of("category",
"updated"), "doc1");
+ store.update(List.of(rewritten), collection,
Collections.emptyMap());
+
+ List<Document> after =
+ store.get(List.of("doc1"), collection, null, null,
Collections.emptyMap());
+ Assertions.assertEquals(1, after.size());
+ Assertions.assertEquals("Milvus stores dense vectors",
after.get(0).getContent());
+ Assertions.assertEquals("updated",
after.get(0).getMetadata().get("category"));
+ } finally {
+ store.deleteCollection(collection);
+ store.close();
+ }
+ }
+
+ @Test
+ @EnabledIfEnvironmentVariable(named = "MILVUS_URI", matches = ".+")
+ void testQueryEmbeddingPopulatesScore() throws Exception {
+ String collection = collectionName("score_populated");
+ MilvusVectorStore store = openStore(collection);
+
+ try {
+ createCollection(store, collection);
+
+ store.add(
+ List.of(
+ new Document(
+ "Milvus is a vector database",
Map.of("src", "test"), "doc1"),
+ new Document(
+ "Apache Flink Agents is an AI framework",
+ Map.of("src", "test"),
+ "doc2")),
+ collection,
+ Collections.emptyMap());
+
+ VectorStoreQuery q =
+ new VectorStoreQuery(
+ "Milvus is a vector database", 5, collection,
Collections.emptyMap());
+ List<Document> hits = store.query(q).getDocuments();
+ Assertions.assertFalse(hits.isEmpty());
+ Assertions.assertTrue(
+ hits.stream().allMatch(d -> d.getScore() != null),
+ "Every Milvus search hit should carry a score");
+
+ List<Document> byId =
+ store.get(List.of("doc1"), collection, null, null,
Collections.emptyMap());
+ Assertions.assertEquals(1, byId.size());
+ Assertions.assertNull(byId.get(0).getScore());
+ } finally {
+ store.deleteCollection(collection);
+ store.close();
+ }
+ }
+
+ /**
+ * Builds a descriptor for integration tests.
+ *
+ * <p>Test-only: STRONG consistency is used here so reads immediately see
preceding writes
+ * within the same test method. Production should use the default BOUNDED
consistency unless
+ * immediate read-after-write visibility is required.
+ */
+ private static ResourceDescriptor descriptor(String collection) {
+ return
ResourceDescriptor.Builder.newBuilder(MilvusVectorStore.class.getName())
+ .addInitialArgument("embedding_model", "embeddingModel")
+ .addInitialArgument("uri", System.getenv("MILVUS_URI"))
+ .addInitialArgument("collection", collection)
+ .addInitialArgument("dims", 5)
+ .addInitialArgument("index_type", "AUTOINDEX")
+ .addInitialArgument("metric_type", "COSINE")
+ // Test-only: avoid timing-sensitive assertions after
insert/update. Production
+ // should use the default BOUNDED consistency unless immediate
read-after-write
+ // visibility is required.
+ .addInitialArgument("consistency_level", "STRONG")
+ .build();
+ }
+
+ private static MilvusVectorStore openStore(String collection) throws
Exception {
+ MilvusVectorStore store =
+ new MilvusVectorStore(
+ descriptor(collection),
+
ResourceContext.fromGetResource(MilvusVectorStoreTest::getResource));
+ store.open();
+ return store;
+ }
+
+ private static void createCollection(MilvusVectorStore store, String
collection) {
+ store.createCollectionIfNotExists(collection, Map.of());
+ }
+
+ private static String collectionName(String prefix) {
+ return "fa_milvus_" + prefix + "_" +
UUID.randomUUID().toString().replace("-", "");
+ }
+
+ private static Document documentById(List<Document> documents, String id) {
+ return documents.stream()
+ .filter(d -> id.equals(d.getId()))
+ .findFirst()
+ .orElseThrow(() -> new AssertionError("Missing document " +
id));
+ }
+
+ private static void assertDocument(
+ Document document, String id, String content, Map<String, Object>
metadata) {
+ Assertions.assertEquals(id, document.getId());
+ Assertions.assertEquals(content, document.getContent());
+ Assertions.assertEquals(metadata, document.getMetadata());
+ Assertions.assertNull(document.getScore());
+ }
+
+ private static void assertMetadataPathIndex(
+ MilvusClientV2 client, String collection, String key, String
castType) {
+ DescribeIndexResp resp =
+ client.describeIndex(
+ DescribeIndexReq.builder()
+ .collectionName(collection)
+ .indexName(metadataPathIndexName(key))
+ .build());
+ DescribeIndexResp.IndexDesc index =
+ resp.getIndexDescByIndexName(metadataPathIndexName(key));
+ Assertions.assertEquals(MilvusVectorStore.DEFAULT_METADATA_FIELD,
index.getFieldName());
+ Assertions.assertEquals(IndexParam.IndexType.AUTOINDEX,
index.getIndexType());
+ assertThat(index.getExtraParams())
+ .containsEntry(
+ "json_path", MilvusVectorStore.DEFAULT_METADATA_FIELD
+ "[\"" + key + "\"]")
+ .containsEntry("json_cast_type", castType);
+ }
+
+ private static String metadataPathIndexName(String key) {
+ return MilvusVectorStore.DEFAULT_METADATA_FIELD + "_" + key +
"_json_index";
+ }
+
+ private static void dropCollectionQuietly(MilvusVectorStore store, String
collection) {
+ try {
+ store.deleteCollection(collection);
+ } catch (Exception ignored) {
+ // Best-effort cleanup for negative-path assertions.
+ }
+ }
+
+ private static Resource getResource(String name, ResourceType type) {
+ BaseEmbeddingModelSetup embeddingModel =
Mockito.mock(BaseEmbeddingModelSetup.class);
+ Mockito.when(embeddingModel.embed("Milvus is a vector database"))
+ .thenReturn(new float[] {1.0f, 0.0f, 0.0f, 0.0f, 0.0f});
+ Mockito.when(embeddingModel.embed("Milvus stores dense vectors"))
+ .thenReturn(new float[] {1.0f, 0.0f, 0.0f, 0.0f, 0.0f});
+ Mockito.when(embeddingModel.embed("Apache Flink Agents is an AI
framework"))
+ .thenReturn(new float[] {0.0f, 1.0f, 0.0f, 0.0f, 0.0f});
+ return embeddingModel;
+ }
+}
diff --git a/integrations/vector-stores/pom.xml
b/integrations/vector-stores/pom.xml
index 4d4766d9..4e1655d0 100644
--- a/integrations/vector-stores/pom.xml
+++ b/integrations/vector-stores/pom.xml
@@ -32,8 +32,9 @@ under the License.
<modules>
<module>elasticsearch</module>
+ <module>milvus</module>
<module>opensearch</module>
<module>s3vectors</module>
</modules>
-</project>
\ No newline at end of file
+</project>
diff --git a/python/flink_agents/api/resource.py
b/python/flink_agents/api/resource.py
index f481edcf..3f20821e 100644
--- a/python/flink_agents/api/resource.py
+++ b/python/flink_agents/api/resource.py
@@ -345,5 +345,8 @@ class ResourceName:
# Amazon S3 Vectors
S3_VECTORS_VECTOR_STORE =
"org.apache.flink.agents.integrations.vectorstores.s3vectors.S3VectorsVectorStore"
+ # Milvus
+ MILVUS_VECTOR_STORE =
"org.apache.flink.agents.integrations.vectorstores.milvus.MilvusVectorStore"
+
# MCP resource names
MCP_SERVER = "flink_agents.integrations.mcp.mcp.MCPServer"
diff --git
a/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/vector_store_cross_language_agent.py
b/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/vector_store_cross_language_agent.py
index 715b2379..18671bea 100644
---
a/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/vector_store_cross_language_agent.py
+++
b/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/vector_store_cross_language_agent.py
@@ -44,9 +44,42 @@ from flink_agents.api.vector_stores.vector_store import (
)
TEST_COLLECTION = "test_collection"
+DEFAULT_COLLECTION = "my_documents"
+EMBEDDING_MODEL_RESOURCE = "embedding_model"
+VECTOR_STORE_RESOURCE = "vector_store"
+BACKEND_ELASTICSEARCH = "ELASTICSEARCH"
+BACKEND_MILVUS = "MILVUS"
MAX_RETRIES_TIMES = 10
+def _selected_backend() -> str:
+ return os.environ.get("VECTOR_STORE_BACKEND",
BACKEND_ELASTICSEARCH).upper()
+
+
+def _vector_store_backend_from_resource(
+ vector_store: CollectionManageableVectorStore,
+) -> str:
+ # Java wrapper: reflect on the underlying Java class name
+ j_resource = getattr(vector_store, "_j_resource", None)
+ if j_resource is not None:
+ try:
+ class_name = j_resource.getClass().getName().lower()
+ if "milvus" in class_name:
+ return BACKEND_MILVUS
+ if "elasticsearch" in class_name:
+ return BACKEND_ELASTICSEARCH
+ except Exception:
+ pass
+
+ # Pure Python store: fallback to env var (cross-language test only
+ # uses Java wrappers, so this path should not be hit in practice)
+ return _selected_backend()
+
+
+def _backend_from_context(ctx: RunnerContext) -> str:
+ return ctx.short_term_memory.get("vector_store_backend") or
_selected_backend()
+
+
class VectorStoreCrossLanguageAgent(Agent):
"""Example agent demonstrating cross-language embedding model testing."""
@@ -88,14 +121,37 @@ class VectorStoreCrossLanguageAgent(Agent):
@staticmethod
def vector_store() -> ResourceDescriptor:
"""Vector store setup for knowledge base."""
- return ResourceDescriptor(
-
clazz=ResourceName.VectorStore.JAVA_WRAPPER_COLLECTION_MANAGEABLE_VECTOR_STORE,
-
java_clazz=ResourceName.VectorStore.Java.ELASTICSEARCH_VECTOR_STORE,
- embedding_model="embedding_model",
- host=os.environ.get("ES_HOST"),
- index="my_documents",
- dims=768,
- )
+ backend = _selected_backend()
+ collection = os.environ.get("VECTOR_STORE_COLLECTION",
DEFAULT_COLLECTION)
+
+ if backend == BACKEND_ELASTICSEARCH:
+ return ResourceDescriptor(
+
clazz=ResourceName.VectorStore.JAVA_WRAPPER_COLLECTION_MANAGEABLE_VECTOR_STORE,
+
java_clazz=ResourceName.VectorStore.Java.ELASTICSEARCH_VECTOR_STORE,
+ embedding_model=EMBEDDING_MODEL_RESOURCE,
+ host=os.environ.get("ES_HOST"),
+ index=collection,
+ dims=768,
+ )
+ if backend == BACKEND_MILVUS:
+ return ResourceDescriptor(
+
clazz=ResourceName.VectorStore.JAVA_WRAPPER_COLLECTION_MANAGEABLE_VECTOR_STORE,
+ java_clazz=ResourceName.VectorStore.Java.MILVUS_VECTOR_STORE,
+ embedding_model=EMBEDDING_MODEL_RESOURCE,
+ uri=os.environ.get("MILVUS_URI"),
+ collection=collection,
+ dims=768,
+ metric_type="COSINE",
+ index_type="AUTOINDEX",
+ # Test-only: this e2e checks read-after-write behavior
immediately.
+ # Production should use the default BOUNDED consistency unless
immediate
+ # read-after-write visibility is required.
+ consistency_level="STRONG",
+ metadata_index_keys=["category", "source"],
+ )
+
+ msg = f"Unsupported vector store backend: {backend}"
+ raise ValueError(msg)
@action(InputEvent.EVENT_TYPE)
@staticmethod
@@ -112,16 +168,23 @@ class VectorStoreCrossLanguageAgent(Agent):
is_initialized = stm.get("is_initialized") or False
if not is_initialized:
- print("[TEST] Initializing vector store...")
+ vector_store = ctx.get_resource(
+ VECTOR_STORE_RESOURCE, ResourceType.VECTOR_STORE
+ )
+ backend = _vector_store_backend_from_resource(vector_store)
+ stm.set("vector_store_backend", backend)
+ test_collection = os.environ.get(
+ "VECTOR_STORE_TEST_COLLECTION", TEST_COLLECTION
+ )
+
+ print(f"[TEST][{backend}] Initializing vector store...")
- vector_store = ctx.get_resource("vector_store",
ResourceType.VECTOR_STORE)
if isinstance(vector_store, CollectionManageableVectorStore):
vector_store.create_collection_if_not_exists(
- TEST_COLLECTION, metadata={"key1": "value1", "key2":
"value2"}
+ test_collection, metadata={"key1": "value1", "key2":
"value2"}
)
- vector_store.delete_collection(name=TEST_COLLECTION)
-
- print("[TEST] Vector store Collection Management PASSED")
+ vector_store.delete_collection(name=test_collection)
+ print(f"[TEST][{backend}] Vector store Collection Management
PASSED")
documents = [
Document(
@@ -140,6 +203,9 @@ class VectorStoreCrossLanguageAgent(Agent):
metadata={"category": "utility", "source": "test"},
),
]
+
+ collection = vector_store.collection or DEFAULT_COLLECTION
+ vector_store.create_collection_if_not_exists(collection)
vector_store.add(documents=documents)
assert len(vector_store.get()) == 3
@@ -152,7 +218,10 @@ class VectorStoreCrossLanguageAgent(Agent):
while len(vector_store.get()) > 2 and retry_time <
MAX_RETRIES_TIMES:
retry_time += 1
time.sleep(2)
- print(f"[TEST] Retrying to delete doc3,
retry_time={retry_time}")
+ print(
+ f"[TEST][{backend}] Vector store Retrying to delete
doc3, "
+ f"retry_time={retry_time}"
+ )
assert len(vector_store.get()) == 2
@@ -165,10 +234,10 @@ class VectorStoreCrossLanguageAgent(Agent):
== "Why did the cat sit on the computer? Because it wanted
to keep an eye on the mouse."
)
- print("[TEST] Vector store Document Management PASSED")
+ print(f"[TEST][{backend}] Vector store Document Management
PASSED")
# Verify VectorStoreQuery.filters survives the Python->Java
bridge.
- # Elasticsearch translates the unified-DSL filter to a
bool/must term
+ # Each backend translates the unified-DSL filter to a
bool/must term
# post-filter, so the result must contain only the doc tagged
# ``category=calculate`` (doc1).
filtered_query = VectorStoreQuery(
@@ -178,7 +247,6 @@ class VectorStoreCrossLanguageAgent(Agent):
filters={"category": "calculate"},
)
- # ES is eventually consistent; allow a few retries.
retry_time = 0
filtered_docs = vector_store.query(filtered_query).documents
while len(filtered_docs) != 1 and retry_time <
MAX_RETRIES_TIMES:
@@ -192,23 +260,27 @@ class VectorStoreCrossLanguageAgent(Agent):
)
assert filtered_docs[0].id == "doc1"
- print("[TEST] Vector store filter query PASSED")
+ print(f"[TEST][{backend}] Vector store filter query PASSED")
+ else:
+ msg = "vector_store must implement
CollectionManageableVectorStore"
+ raise TypeError(msg)
stm.set("is_initialized", True)
ctx.send_event(
- ContextRetrievalRequestEvent(query=input_text,
vector_store="vector_store")
+ ContextRetrievalRequestEvent(
+ query=input_text, vector_store=VECTOR_STORE_RESOURCE
+ )
)
@action(ContextRetrievalResponseEvent.EVENT_TYPE)
@staticmethod
- def contextRetrievalResponseEvent(
- event: Event, ctx: RunnerContext
- ) -> None:
+ def contextRetrievalResponseEvent(event: Event, ctx: RunnerContext) ->
None:
"""User defined action for processing context retrieval response.
In this action, we will test Vector store Context Retrieval.
"""
+ backend = _backend_from_context(ctx)
documents = ContextRetrievalResponseEvent.from_event(event).documents
assert documents is not None
@@ -221,7 +293,9 @@ class VectorStoreCrossLanguageAgent(Agent):
test_result = f"[PASS] retrieved_count={len(documents)},
first_doc_id={documents[0].id}, first_doc_preview={documents[0].content[:50]}"
print(
- f"[TEST] Vector store Context Retrieval PASSED,
first_doc_id={documents[0].id}, first_doc_preview={documents[0].content[:50]}"
+ f"[TEST][{backend}] Vector store Context Retrieval PASSED, "
+ f"first_doc_id={documents[0].id}, "
+ f"first_doc_preview={documents[0].content[:50]}"
)
ctx.send_event(OutputEvent(output=test_result))
diff --git
a/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/vector_store_cross_language_test.py
b/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/vector_store_cross_language_test.py
index 36825a77..ac49eecd 100644
---
a/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/vector_store_cross_language_test.py
+++
b/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/vector_store_cross_language_test.py
@@ -16,7 +16,9 @@
# limitations under the License.
#################################################################################
import os
+import sys
import sysconfig
+import uuid
from pathlib import Path
import pytest
@@ -44,23 +46,31 @@ OLLAMA_MODEL = os.environ.get("OLLAMA_EMBEDDING_MODEL",
"nomic-embed-text:latest
os.environ["OLLAMA_EMBEDDING_MODEL"] = OLLAMA_MODEL
ES_HOST = os.environ.get("ES_HOST")
+MILVUS_URI = os.environ.get("MILVUS_URI")
client = pull_model(OLLAMA_MODEL)
+EMBEDDING_TYPES = ["JAVA", "PYTHON"]
os.environ["PYTHONPATH"] = sysconfig.get_paths()["purelib"]
[email protected](
- client is None or ES_HOST is None,
- reason="Ollama client or Elasticsearch host is missing.",
-)
[email protected]("embedding_type", ["JAVA", "PYTHON"])
-def test_java_vector_store_integration(tmp_path: Path, embedding_type: str) ->
None:
- os.environ["EMBEDDING_TYPE"] = embedding_type
+def _run_vector_store_integration(
+ tmp_path: Path,
+ monkeypatch: pytest.MonkeyPatch,
+ embedding_type: str,
+ backend: str,
+) -> None:
+ print(f"[TEST][{backend}] Vector store e2e embedding={embedding_type}")
+ monkeypatch.setenv("EMBEDDING_TYPE", embedding_type)
+ monkeypatch.setenv("VECTOR_STORE_BACKEND", backend)
+ suffix = uuid.uuid4().hex
+ monkeypatch.setenv("VECTOR_STORE_COLLECTION", f"my_documents_{suffix}")
+ monkeypatch.setenv("VECTOR_STORE_TEST_COLLECTION",
f"test_collection_{suffix}")
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.set_parallelism(1)
+ env.set_python_executable(sys.executable)
# currently, bounded source is not supported due to runtime
implementation, so
# we use continuous file source here.
@@ -110,5 +120,30 @@ def test_java_vector_store_integration(tmp_path: Path,
embedding_type: str) -> N
with file.open() as f:
actual_result.extend(f.readlines())
+ assert len(actual_result) >= 2
assert "PASS" in actual_result[0]
assert "PASS" in actual_result[1]
+
+
[email protected](
+ client is None or ES_HOST is None,
+ reason="Embedding model client or Elasticsearch host is missing.",
+)
[email protected]("embedding_type", EMBEDDING_TYPES)
+def test_elasticsearch_vector_store_integration(
+ tmp_path: Path, monkeypatch: pytest.MonkeyPatch, embedding_type: str
+) -> None:
+ _run_vector_store_integration(
+ tmp_path, monkeypatch, embedding_type, "ELASTICSEARCH"
+ )
+
+
[email protected](
+ client is None or MILVUS_URI is None,
+ reason="Embedding model client or Milvus URI is missing.",
+)
[email protected]("embedding_type", EMBEDDING_TYPES)
+def test_milvus_vector_store_integration(
+ tmp_path: Path, monkeypatch: pytest.MonkeyPatch, embedding_type: str
+) -> None:
+ _run_vector_store_integration(tmp_path, monkeypatch, embedding_type,
"MILVUS")
diff --git a/python/flink_agents/runtime/java/java_vector_store.py
b/python/flink_agents/runtime/java/java_vector_store.py
index 76e54754..301d7a53 100644
--- a/python/flink_agents/runtime/java/java_vector_store.py
+++ b/python/flink_agents/runtime/java/java_vector_store.py
@@ -51,6 +51,10 @@ class
JavaVectorStoreImpl(JavaCollectionManageableVectorStore):
"""
# embedding_model are required parameters for BaseVectorStore
embedding_model = kwargs.pop("embedding_model", "")
+ # Elasticsearch/OpenSearch call their document container "index";
+ # expose it as BaseVectorStore's generic default collection.
+ if kwargs.get("collection") is None and kwargs.get("index") is not
None:
+ kwargs["collection"] = kwargs["index"]
super().__init__(embedding_model=embedding_model, **kwargs)
self._j_resource = j_resource
diff --git a/tools/docker/elasticsearch/docker-compose.yml
b/tools/docker/elasticsearch/docker-compose.yml
new file mode 100644
index 00000000..378f8522
--- /dev/null
+++ b/tools/docker/elasticsearch/docker-compose.yml
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+services:
+ elasticsearch:
+ image: docker.elastic.co/elasticsearch/elasticsearch:8.19.0
+ container_name: flink-agents-elasticsearch
+ environment:
+ discovery.type: single-node
+ xpack.security.enabled: "false"
+ ES_JAVA_OPTS: "-Xms512m -Xmx512m"
+ ports:
+ - "9200:9200"
+ volumes:
+ - elasticsearch_data:/usr/share/elasticsearch/data
+ healthcheck:
+ test: ["CMD-SHELL", "curl -f http://localhost:9200/_cluster/health ||
exit 1"]
+ interval: 10s
+ timeout: 5s
+ retries: 10
+ start_period: 30s
+ networks:
+ - flink-agents-elasticsearch
+
+volumes:
+ elasticsearch_data:
+
+networks:
+ flink-agents-elasticsearch:
+ driver: bridge
diff --git a/tools/docker/milvus/docker-compose.yml
b/tools/docker/milvus/docker-compose.yml
new file mode 100644
index 00000000..9a45b471
--- /dev/null
+++ b/tools/docker/milvus/docker-compose.yml
@@ -0,0 +1,98 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+services:
+ milvus-etcd:
+ image: quay.io/coreos/etcd:v3.5.5
+ container_name: flink-agents-milvus-etcd
+ environment:
+ - ETCD_AUTO_COMPACTION_MODE=revision
+ - ETCD_AUTO_COMPACTION_RETENTION=1000
+ - ETCD_QUOTA_BACKEND_BYTES=4294967296
+ - ETCD_SNAPSHOT_COUNT=50000
+ command: >
+ etcd
+ -advertise-client-urls=http://127.0.0.1:2379
+ -listen-client-urls http://0.0.0.0:2379
+ --data-dir /etcd
+ healthcheck:
+ test: ["CMD", "etcdctl", "--endpoints=http://127.0.0.1:2379",
"endpoint", "health"]
+ interval: 30s
+ timeout: 20s
+ retries: 3
+ volumes:
+ - milvus_etcd_data:/etcd
+ networks:
+ - flink-agents-milvus
+
+ milvus-minio:
+ image: minio/minio:RELEASE.2023-03-20T20-16-18Z
+ container_name: flink-agents-milvus-minio
+ environment:
+ MINIO_ACCESS_KEY: minioadmin
+ MINIO_SECRET_KEY: minioadmin
+ command: minio server /minio_data --console-address ":9001"
+ ports:
+ - "9000:9000"
+ - "9001:9001"
+ healthcheck:
+ test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
+ interval: 30s
+ timeout: 20s
+ retries: 3
+ volumes:
+ - milvus_minio_data:/minio_data
+ networks:
+ - flink-agents-milvus
+
+ milvus-standalone:
+ image: milvusdb/milvus:v2.6.15
+ container_name: flink-agents-milvus-standalone
+ command: ["milvus", "run", "standalone"]
+ environment:
+ ETCD_ENDPOINTS: milvus-etcd:2379
+ MINIO_ADDRESS: milvus-minio:9000
+ ports:
+ - "19530:19530"
+ - "9091:9091"
+ volumes:
+ - milvus_data:/var/lib/milvus
+ healthcheck:
+ test: ["CMD", "curl", "-f", "http://localhost:9091/healthz"]
+ interval: 30s
+ timeout: 20s
+ retries: 3
+ start_period: 90s
+ depends_on:
+ - milvus-etcd
+ - milvus-minio
+ networks:
+ - flink-agents-milvus
+
+volumes:
+ milvus_etcd_data:
+ driver: local
+ milvus_minio_data:
+ driver: local
+ milvus_data:
+ driver: local
+
+networks:
+ flink-agents-milvus:
+ driver: bridge