This is an automated email from the ASF dual-hosted git repository.

davidzollo pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 08f26538cd [Bug] [Connector-V2] Fix Milvus sink collection_name target 
handling (#10893)
08f26538cd is described below

commit 08f26538cd29567996e268046ace32b73c7cce8b
Author: Puneet Dixit <[email protected]>
AuthorDate: Sun May 17 19:28:10 2026 +0530

    [Bug] [Connector-V2] Fix Milvus sink collection_name target handling 
(#10893)
    
    Co-authored-by: Puneet Dixit <[email protected]>
---
 docs/en/connectors/sink/Milvus.md                  |   4 +-
 docs/zh/connectors/sink/Milvus.md                  |   4 +-
 .../seatunnel/milvus/config/MilvusBaseOptions.java |   1 +
 .../seatunnel/milvus/sink/MilvusSinkFactory.java   |  11 ++
 .../milvus/sink/MilvusSinkFactoryTest.java         | 157 +++++++++++++++++++++
 5 files changed, 175 insertions(+), 2 deletions(-)

diff --git a/docs/en/connectors/sink/Milvus.md 
b/docs/en/connectors/sink/Milvus.md
index a91ab5c69b..4868d3ba6f 100644
--- a/docs/en/connectors/sink/Milvus.md
+++ b/docs/en/connectors/sink/Milvus.md
@@ -44,6 +44,7 @@ This Milvus sink connector write data to Milvus or Zilliz 
Cloud, it has the foll
 | url                    | String              | Yes      | -                  
          | The URL to connect to Milvus or Zilliz Cloud.                       
                                                                                
|
 | token                  | String              | Yes      | -                  
          | User:password                                                       
                                                                                
|
 | database               | String              | No       | -                  
          | Write data to which database, default is source database.           
                                                                                
|
+| collection             | String              | No       | -                  
          | Write data to which collection, default is source table name. The 
deprecated `collection_name` key is accepted as an alias.                       
  |
 | schema_save_mode       | enum                | No       | 
CREATE_SCHEMA_WHEN_NOT_EXIST | Auto create table when table not exist.          
                                                                                
                   |
 | enable_auto_id         | boolean             | No       | false              
          | Primary key column enable autoId.                                   
                                                                                
|
 | enable_upsert          | boolean             | No       | false              
          | Upsert data not insert.                                             
                                                                                
|
@@ -62,6 +63,7 @@ sink {
   Milvus {
     url = "http://127.0.0.1:19530";
     token = "username:password"
+    collection = "user_vectors"
     batch_size = 1000
   }
 }
@@ -86,4 +88,4 @@ sink {
 
 ## Changelog
 
-<ChangeLog />
\ No newline at end of file
+<ChangeLog />
diff --git a/docs/zh/connectors/sink/Milvus.md 
b/docs/zh/connectors/sink/Milvus.md
index bdb104b99c..c46a8e10be 100644
--- a/docs/zh/connectors/sink/Milvus.md
+++ b/docs/zh/connectors/sink/Milvus.md
@@ -44,6 +44,7 @@ Milvus sink连接器将数据写入Milvus或Zilliz Cloud,它具有以下功能
 | url                    | String              | 是    | -                      
      | 连接到Milvus或Zilliz Cloud的URL。                                         |
 | token                  | String              | 是    | -                      
      | 用户:密码                                                               |
 | database               | String              | 否    | -                      
      | 将数据写入哪个数据库,默认为源数据库。                                                 |
+| collection             | String              | 否    | -                      
      | 将数据写入哪个集合,默认为源表名。兼容旧配置键 `collection_name`。                              
|
 | schema_save_mode       | enum                | 否    | 
CREATE_SCHEMA_WHEN_NOT_EXIST | 当表不存在时自动创建表。                                     
                   |
 | enable_auto_id         | boolean             | 否    | false                  
      | 主键列启用autoId。                                                        |
 | enable_upsert          | boolean             | 否    | false                  
      | 是否启用upsert。                                                         |
@@ -62,6 +63,7 @@ sink {
   Milvus {
     url = "http://127.0.0.1:19530";
     token = "username:password"
+    collection = "user_vectors"
     batch_size = 1000
   }
 }
@@ -86,4 +88,4 @@ sink {
 
 ## 变更日志
 
-<ChangeLog />
\ No newline at end of file
+<ChangeLog />
diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusBaseOptions.java
 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusBaseOptions.java
index 77c7462967..e6d88c9f21 100644
--- 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusBaseOptions.java
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusBaseOptions.java
@@ -34,6 +34,7 @@ public abstract class MilvusBaseOptions {
             Options.key("collection")
                     .stringType()
                     .noDefaultValue()
+                    .withFallbackKeys("collection_name")
                     .withDescription("Milvus collection");
 
     public static final Option<String> TOKEN =
diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java
 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java
index 9916e5d515..8cfab9d139 100644
--- 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java
@@ -44,15 +44,26 @@ public class MilvusSinkFactory implements TableSinkFactory {
         return OptionRule.builder()
                 .required(MilvusSinkOptions.URL, MilvusSinkOptions.TOKEN)
                 .optional(
+                        MilvusSinkOptions.DATABASE,
+                        MilvusSinkOptions.COLLECTION,
+                        MilvusSinkOptions.COLLECTION_DESCRIPTION,
+                        MilvusSinkOptions.PARTITION_KEY,
                         MilvusSinkOptions.ENABLE_UPSERT,
                         MilvusSinkOptions.ENABLE_DYNAMIC_FIELD,
                         MilvusSinkOptions.ENABLE_AUTO_ID,
+                        MilvusSinkOptions.BATCH_SIZE,
+                        MilvusSinkOptions.RATE_LIMIT,
+                        MilvusSinkOptions.LOAD_COLLECTION,
+                        MilvusSinkOptions.CREATE_INDEX,
                         MilvusSinkOptions.SCHEMA_SAVE_MODE,
                         MilvusSinkOptions.DATA_SAVE_MODE)
                 .build();
     }
 
     public TableSink createSink(TableSinkFactoryContext context) {
+        if (context == null) {
+            return () -> new MilvusSink(null, null);
+        }
         ReadonlyConfig config = context.getOptions();
         CatalogTable catalogTable = renameCatalogTable(config, 
context.getCatalogTable());
         return () -> new MilvusSink(config, catalogTable);
diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/test/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactoryTest.java
 
b/seatunnel-connectors-v2/connector-milvus/src/test/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactoryTest.java
new file mode 100644
index 0000000000..1b5542366e
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/test/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactoryTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.sink;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.connector.TableSink;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.api.table.type.BasicType;
+import 
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkOptions;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class MilvusSinkFactoryTest {
+
+    private final MilvusSinkFactory factory = new MilvusSinkFactory();
+
+    @Test
+    void testCollectionNameAliasUsesTargetCollection() {
+        SeaTunnelSink<?, ?, ?, ?> sink =
+                createSeaTunnelSink(
+                        config(
+                                "collection_name",
+                                "user_data",
+                                MilvusSinkOptions.DATABASE.key(),
+                                "data_goverment_test"));
+
+        CatalogTable writeCatalogTable = writeCatalogTable(sink);
+        assertEquals("data_goverment_test", 
writeCatalogTable.getTablePath().getDatabaseName());
+        assertEquals("user_data", 
writeCatalogTable.getTablePath().getTableName());
+    }
+
+    @Test
+    void testCollectionUsesTargetCollection() {
+        SeaTunnelSink<?, ?, ?, ?> sink =
+                createSeaTunnelSink(config(MilvusSinkOptions.COLLECTION.key(), 
"user_data"));
+
+        CatalogTable writeCatalogTable = writeCatalogTable(sink);
+        assertEquals("source_database", 
writeCatalogTable.getTablePath().getDatabaseName());
+        assertEquals("user_data", 
writeCatalogTable.getTablePath().getTableName());
+    }
+
+    @Test
+    void testCollectionTakesPrecedenceOverCollectionNameAlias() {
+        SeaTunnelSink<?, ?, ?, ?> sink =
+                createSeaTunnelSink(
+                        config(
+                                MilvusSinkOptions.COLLECTION.key(),
+                                "canonical_collection",
+                                "collection_name",
+                                "alias_collection"));
+
+        assertEquals("canonical_collection", 
writeCatalogTable(sink).getTablePath().getTableName());
+    }
+
+    @Test
+    void testCreateSinkAllowsNullContextForFallbackProbe() {
+        TableSink<?, ?, ?, ?> tableSink = assertDoesNotThrow(() -> 
factory.createSink(null));
+        assertNotNull(tableSink);
+    }
+
+    @Test
+    void testOptionRuleIncludesSinkOptions() {
+        List<Option<?>> declaredOptions = 
declaredOptions(factory.optionRule());
+
+        assertTrue(declaredOptions.contains(MilvusSinkOptions.DATABASE));
+        assertTrue(declaredOptions.contains(MilvusSinkOptions.COLLECTION));
+        assertTrue(declaredOptions.contains(MilvusSinkOptions.BATCH_SIZE));
+        assertTrue(declaredOptions.contains(MilvusSinkOptions.PARTITION_KEY));
+        
assertTrue(declaredOptions.contains(MilvusSinkOptions.COLLECTION_DESCRIPTION));
+        
assertTrue(declaredOptions.contains(MilvusSinkOptions.LOAD_COLLECTION));
+        assertTrue(declaredOptions.contains(MilvusSinkOptions.CREATE_INDEX));
+        assertTrue(declaredOptions.contains(MilvusSinkOptions.RATE_LIMIT));
+    }
+
+    private SeaTunnelSink<?, ?, ?, ?> createSeaTunnelSink(Map<String, Object> 
options) {
+        TableSinkFactoryContext context =
+                new TableSinkFactoryContext(
+                        catalogTable(),
+                        ReadonlyConfig.fromMap(options),
+                        Thread.currentThread().getContextClassLoader());
+        return factory.createSink(context).createSink();
+    }
+
+    private CatalogTable writeCatalogTable(SeaTunnelSink<?, ?, ?, ?> sink) {
+        return sink.getWriteCatalogTable()
+                .orElseThrow(() -> new AssertionError("Expected sink write 
catalog table"));
+    }
+
+    private Map<String, Object> config(Object... keysAndValues) {
+        Map<String, Object> options = new HashMap<>();
+        options.put(MilvusSinkOptions.URL.key(), "http://localhost:19530";);
+        options.put(MilvusSinkOptions.TOKEN.key(), "root:Milvus");
+        for (int i = 0; i < keysAndValues.length; i += 2) {
+            options.put((String) keysAndValues[i], keysAndValues[i + 1]);
+        }
+        return options;
+    }
+
+    private CatalogTable catalogTable() {
+        List<Column> columns =
+                Arrays.asList(
+                        PhysicalColumn.of(
+                                "uniq_id", BasicType.STRING_TYPE, 0L, false, 
null, "Unique id"),
+                        PhysicalColumn.of(
+                                "profile", BasicType.STRING_TYPE, 0L, true, 
null, "Profile"));
+
+        return CatalogTable.of(
+                TableIdentifier.of("S3File", "source_database", "raw_data"),
+                TableSchema.builder().columns(columns).build(),
+                new HashMap<>(),
+                Arrays.asList(),
+                "Source table");
+    }
+
+    private List<Option<?>> declaredOptions(OptionRule rule) {
+        return Stream.concat(
+                        rule.getRequiredOptions().stream()
+                                .flatMap(option -> 
option.getOptions().stream()),
+                        rule.getOptionalOptions().stream())
+                .collect(Collectors.toList());
+    }
+}

Reply via email to