This is an automated email from the ASF dual-hosted git repository.
davidzollo pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 08f26538cd [Bug] [Connector-V2] Fix Milvus sink collection_name target
handling (#10893)
08f26538cd is described below
commit 08f26538cd29567996e268046ace32b73c7cce8b
Author: Puneet Dixit <[email protected]>
AuthorDate: Sun May 17 19:28:10 2026 +0530
[Bug] [Connector-V2] Fix Milvus sink collection_name target handling
(#10893)
Co-authored-by: Puneet Dixit <[email protected]>
---
docs/en/connectors/sink/Milvus.md | 4 +-
docs/zh/connectors/sink/Milvus.md | 4 +-
.../seatunnel/milvus/config/MilvusBaseOptions.java | 1 +
.../seatunnel/milvus/sink/MilvusSinkFactory.java | 11 ++
.../milvus/sink/MilvusSinkFactoryTest.java | 157 +++++++++++++++++++++
5 files changed, 175 insertions(+), 2 deletions(-)
diff --git a/docs/en/connectors/sink/Milvus.md
b/docs/en/connectors/sink/Milvus.md
index a91ab5c69b..4868d3ba6f 100644
--- a/docs/en/connectors/sink/Milvus.md
+++ b/docs/en/connectors/sink/Milvus.md
@@ -44,6 +44,7 @@ This Milvus sink connector write data to Milvus or Zilliz
Cloud, it has the foll
| url | String | Yes | -
| The URL to connect to Milvus or Zilliz Cloud.
|
| token | String | Yes | -
| User:password
|
| database | String | No | -
| Write data to which database, default is source database.
|
+| collection | String | No | -
| Write data to which collection, default is source table name. The
deprecated `collection_name` key is accepted as an alias.
|
| schema_save_mode | enum | No |
CREATE_SCHEMA_WHEN_NOT_EXIST | Auto create table when table not exist.
|
| enable_auto_id | boolean | No | false
| Primary key column enable autoId.
|
| enable_upsert | boolean | No | false
| Upsert data not insert.
|
@@ -62,6 +63,7 @@ sink {
Milvus {
url = "http://127.0.0.1:19530"
token = "username:password"
+ collection = "user_vectors"
batch_size = 1000
}
}
@@ -86,4 +88,4 @@ sink {
## Changelog
-<ChangeLog />
\ No newline at end of file
+<ChangeLog />
diff --git a/docs/zh/connectors/sink/Milvus.md
b/docs/zh/connectors/sink/Milvus.md
index bdb104b99c..c46a8e10be 100644
--- a/docs/zh/connectors/sink/Milvus.md
+++ b/docs/zh/connectors/sink/Milvus.md
@@ -44,6 +44,7 @@ Milvus sink连接器将数据写入Milvus或Zilliz Cloud,它具有以下功能
| url | String | 是 | -
| 连接到Milvus或Zilliz Cloud的URL。 |
| token | String | 是 | -
| 用户:密码 |
| database | String | 否 | -
| 将数据写入哪个数据库,默认为源数据库。 |
+| collection | String | 否 | -
| 将数据写入哪个集合,默认为源表名。兼容旧配置键 `collection_name`。
|
| schema_save_mode | enum | 否 |
CREATE_SCHEMA_WHEN_NOT_EXIST | 当表不存在时自动创建表。
|
| enable_auto_id | boolean | 否 | false
| 主键列启用autoId。 |
| enable_upsert | boolean | 否 | false
| 是否启用upsert。 |
@@ -62,6 +63,7 @@ sink {
Milvus {
url = "http://127.0.0.1:19530"
token = "username:password"
+ collection = "user_vectors"
batch_size = 1000
}
}
@@ -86,4 +88,4 @@ sink {
## 变更日志
-<ChangeLog />
\ No newline at end of file
+<ChangeLog />
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusBaseOptions.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusBaseOptions.java
index 77c7462967..e6d88c9f21 100644
---
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusBaseOptions.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusBaseOptions.java
@@ -34,6 +34,7 @@ public abstract class MilvusBaseOptions {
Options.key("collection")
.stringType()
.noDefaultValue()
+ .withFallbackKeys("collection_name")
.withDescription("Milvus collection");
public static final Option<String> TOKEN =
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java
index 9916e5d515..8cfab9d139 100644
---
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java
@@ -44,15 +44,26 @@ public class MilvusSinkFactory implements TableSinkFactory {
return OptionRule.builder()
.required(MilvusSinkOptions.URL, MilvusSinkOptions.TOKEN)
.optional(
+ MilvusSinkOptions.DATABASE,
+ MilvusSinkOptions.COLLECTION,
+ MilvusSinkOptions.COLLECTION_DESCRIPTION,
+ MilvusSinkOptions.PARTITION_KEY,
MilvusSinkOptions.ENABLE_UPSERT,
MilvusSinkOptions.ENABLE_DYNAMIC_FIELD,
MilvusSinkOptions.ENABLE_AUTO_ID,
+ MilvusSinkOptions.BATCH_SIZE,
+ MilvusSinkOptions.RATE_LIMIT,
+ MilvusSinkOptions.LOAD_COLLECTION,
+ MilvusSinkOptions.CREATE_INDEX,
MilvusSinkOptions.SCHEMA_SAVE_MODE,
MilvusSinkOptions.DATA_SAVE_MODE)
.build();
}
public TableSink createSink(TableSinkFactoryContext context) {
+ if (context == null) {
+ return () -> new MilvusSink(null, null);
+ }
ReadonlyConfig config = context.getOptions();
CatalogTable catalogTable = renameCatalogTable(config,
context.getCatalogTable());
return () -> new MilvusSink(config, catalogTable);
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/test/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactoryTest.java
b/seatunnel-connectors-v2/connector-milvus/src/test/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactoryTest.java
new file mode 100644
index 0000000000..1b5542366e
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-milvus/src/test/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactoryTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.sink;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.connector.TableSink;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.api.table.type.BasicType;
+import
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkOptions;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class MilvusSinkFactoryTest {
+
+ private final MilvusSinkFactory factory = new MilvusSinkFactory();
+
+ @Test
+ void testCollectionNameAliasUsesTargetCollection() {
+ SeaTunnelSink<?, ?, ?, ?> sink =
+ createSeaTunnelSink(
+ config(
+ "collection_name",
+ "user_data",
+ MilvusSinkOptions.DATABASE.key(),
+ "data_goverment_test"));
+
+ CatalogTable writeCatalogTable = writeCatalogTable(sink);
+ assertEquals("data_goverment_test",
writeCatalogTable.getTablePath().getDatabaseName());
+ assertEquals("user_data",
writeCatalogTable.getTablePath().getTableName());
+ }
+
+ @Test
+ void testCollectionUsesTargetCollection() {
+ SeaTunnelSink<?, ?, ?, ?> sink =
+ createSeaTunnelSink(config(MilvusSinkOptions.COLLECTION.key(),
"user_data"));
+
+ CatalogTable writeCatalogTable = writeCatalogTable(sink);
+ assertEquals("source_database",
writeCatalogTable.getTablePath().getDatabaseName());
+ assertEquals("user_data",
writeCatalogTable.getTablePath().getTableName());
+ }
+
+ @Test
+ void testCollectionTakesPrecedenceOverCollectionNameAlias() {
+ SeaTunnelSink<?, ?, ?, ?> sink =
+ createSeaTunnelSink(
+ config(
+ MilvusSinkOptions.COLLECTION.key(),
+ "canonical_collection",
+ "collection_name",
+ "alias_collection"));
+
+ assertEquals("canonical_collection",
writeCatalogTable(sink).getTablePath().getTableName());
+ }
+
+ @Test
+ void testCreateSinkAllowsNullContextForFallbackProbe() {
+ TableSink<?, ?, ?, ?> tableSink = assertDoesNotThrow(() ->
factory.createSink(null));
+ assertNotNull(tableSink);
+ }
+
+ @Test
+ void testOptionRuleIncludesSinkOptions() {
+ List<Option<?>> declaredOptions =
declaredOptions(factory.optionRule());
+
+ assertTrue(declaredOptions.contains(MilvusSinkOptions.DATABASE));
+ assertTrue(declaredOptions.contains(MilvusSinkOptions.COLLECTION));
+ assertTrue(declaredOptions.contains(MilvusSinkOptions.BATCH_SIZE));
+ assertTrue(declaredOptions.contains(MilvusSinkOptions.PARTITION_KEY));
+
assertTrue(declaredOptions.contains(MilvusSinkOptions.COLLECTION_DESCRIPTION));
+
assertTrue(declaredOptions.contains(MilvusSinkOptions.LOAD_COLLECTION));
+ assertTrue(declaredOptions.contains(MilvusSinkOptions.CREATE_INDEX));
+ assertTrue(declaredOptions.contains(MilvusSinkOptions.RATE_LIMIT));
+ }
+
+ private SeaTunnelSink<?, ?, ?, ?> createSeaTunnelSink(Map<String, Object>
options) {
+ TableSinkFactoryContext context =
+ new TableSinkFactoryContext(
+ catalogTable(),
+ ReadonlyConfig.fromMap(options),
+ Thread.currentThread().getContextClassLoader());
+ return factory.createSink(context).createSink();
+ }
+
+ private CatalogTable writeCatalogTable(SeaTunnelSink<?, ?, ?, ?> sink) {
+ return sink.getWriteCatalogTable()
+ .orElseThrow(() -> new AssertionError("Expected sink write
catalog table"));
+ }
+
+ private Map<String, Object> config(Object... keysAndValues) {
+ Map<String, Object> options = new HashMap<>();
+ options.put(MilvusSinkOptions.URL.key(), "http://localhost:19530");
+ options.put(MilvusSinkOptions.TOKEN.key(), "root:Milvus");
+ for (int i = 0; i < keysAndValues.length; i += 2) {
+ options.put((String) keysAndValues[i], keysAndValues[i + 1]);
+ }
+ return options;
+ }
+
+ private CatalogTable catalogTable() {
+ List<Column> columns =
+ Arrays.asList(
+ PhysicalColumn.of(
+ "uniq_id", BasicType.STRING_TYPE, 0L, false,
null, "Unique id"),
+ PhysicalColumn.of(
+ "profile", BasicType.STRING_TYPE, 0L, true,
null, "Profile"));
+
+ return CatalogTable.of(
+ TableIdentifier.of("S3File", "source_database", "raw_data"),
+ TableSchema.builder().columns(columns).build(),
+ new HashMap<>(),
+ Arrays.asList(),
+ "Source table");
+ }
+
+ private List<Option<?>> declaredOptions(OptionRule rule) {
+ return Stream.concat(
+ rule.getRequiredOptions().stream()
+ .flatMap(option ->
option.getOptions().stream()),
+ rule.getOptionalOptions().stream())
+ .collect(Collectors.toList());
+ }
+}