This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 58a5a2d134 [Feature][Connector-V2][Milvus] Sink writer flush by 
interval (#9961)
58a5a2d134 is described below

commit 58a5a2d134d26f448ea53f59da89f8b0116c78b2
Author: loupipalien <[email protected]>
AuthorDate: Sat Nov 8 21:34:47 2025 +0800

    [Feature][Connector-V2][Milvus] Sink writer flush by interval (#9961)
---
 docs/en/connector-v2/sink/Milvus.md                |  28 ++---
 docs/zh/connector-v2/sink/Milvus.md                |  45 +++++---
 .../seatunnel/milvus/config/MilvusSinkOptions.java |   1 +
 .../seatunnel/milvus/sink/MilvusSinkWriter.java    |  19 ++--
 .../e2e/connector/v2/milvus/MilvusIT.java          | 114 +++++++++++++++++++++
 .../test/resources/streaming-fake-to-milvus.conf   |  67 ++++++++++++
 6 files changed, 241 insertions(+), 33 deletions(-)

diff --git a/docs/en/connector-v2/sink/Milvus.md 
b/docs/en/connector-v2/sink/Milvus.md
index 0f1232ed0c..59f8920950 100644
--- a/docs/en/connector-v2/sink/Milvus.md
+++ b/docs/en/connector-v2/sink/Milvus.md
@@ -39,20 +39,20 @@ This Milvus sink connector write data to Milvus or Zilliz 
Cloud, it has the foll
 
 ## Sink Options
 
-|         Name         | Type    | Required |           Default            | 
Description                                                                     
   |
-|----------------------|---------|----------|------------------------------|------------------------------------------------------------------------------------|
-| url                  | String  | Yes      | -                            | 
The URL to connect to Milvus or Zilliz Cloud.                                   
   |
-| token                | String  | Yes      | -                            | 
User:password                                                                   
   |
-| database             | String  | No       | -                            | 
Write data to which database, default is source database.                       
   |
-| schema_save_mode     | enum    | No       | CREATE_SCHEMA_WHEN_NOT_EXIST | 
Auto create table when table not exist.                                         
   |
-| enable_auto_id       | boolean | No       | false                        | 
Primary key column enable autoId.                                               
   |
-| enable_upsert        | boolean | No       | false                        | 
Upsert data not insert.                                                         
   |
-| enable_dynamic_field | boolean | No       | true                         | 
Enable create table with dynamic field.                                         
   |
-| batch_size           | int     | No       | 1000                         | 
Write batch size.                                                               
   |
-| partition_key        | String  | No       |                              | 
Milvus partition key field                                                      
   |
-| create_index         | boolean | No       | false                        | 
Automatically create vector indexes for collection to improve query 
performance.   |
-| load_collection      | boolean | No       | false                        | 
Load collection into Milvus memory for immediate query availability.            
   |
-| collection_description | Map<String, String> | No | {}                   | 
Collection descriptions map where key is collection name and value is 
description. |                                         
+| Name                   | Type                | Required | Default            
          | Description                                                         
                                                                                
|
+|------------------------|---------------------|----------|------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------|
+| url                    | String              | Yes      | -                  
          | The URL to connect to Milvus or Zilliz Cloud.                       
                                                                                
|
+| token                  | String              | Yes      | -                  
          | User:password                                                       
                                                                                
|
+| database               | String              | No       | -                  
          | Write data to which database, default is source database.           
                                                                                
|
+| schema_save_mode       | enum                | No       | 
CREATE_SCHEMA_WHEN_NOT_EXIST | Auto create table when table not exist.          
                                                                                
                   |
+| enable_auto_id         | boolean             | No       | false              
          | Primary key column enable autoId.                                   
                                                                                
|
+| enable_upsert          | boolean             | No       | false              
          | Upsert data not insert.                                             
                                                                                
|
+| enable_dynamic_field   | boolean             | No       | true               
          | Enable create table with dynamic field.                             
                                                                                
|
+| batch_size             | int                 | No       | 1000               
          | Write batch size. When the number of buffered records reaches 
`batch_size` or the time reaches `checkpoint.interval`, it will trigger a write 
flush |
+| partition_key          | String              | No       |                    
          | Milvus partition key field                                          
                                                                                
|
+| create_index           | boolean             | No       | false              
          | Automatically create vector indexes for collection to improve query 
performance.                                                                    
|
+| load_collection        | boolean             | No       | false              
          | Load collection into Milvus memory for immediate query 
availability.                                                                   
             |
+| collection_description | Map<String, String> | No       | {}                 
          | Collection descriptions map where key is collection name and value 
is description.                                                                 
 |                                         
 
 ## Task Example
 
diff --git a/docs/zh/connector-v2/sink/Milvus.md 
b/docs/zh/connector-v2/sink/Milvus.md
index 37c1d467ee..b992c907fd 100644
--- a/docs/zh/connector-v2/sink/Milvus.md
+++ b/docs/zh/connector-v2/sink/Milvus.md
@@ -19,7 +19,7 @@ Milvus sink连接器将数据写入Milvus或Zilliz Cloud,它具有以下功能
 
 ##数据类型映射
 
-|  Milvus数据类型   | SeaTunnel 数据类型 |
+| Milvus数据类型          | SeaTunnel 数据类型      |
 |---------------------|---------------------|
 | INT8                | TINYINT             |
 | INT16               | SMALLINT            |
@@ -39,20 +39,24 @@ Milvus sink连接器将数据写入Milvus或Zilliz Cloud,它具有以下功能
 
 ## Sink 选项
 
-|         名字         | 类型    | 是否必传 |           默认值            | 描述            
                                   |
-|----------------------|---------|----------|------------------------------|-----------------------------------------------------------|
-| url                  | String  | 是      | -                            | 
连接到Milvus或Zilliz Cloud的URL。             |
-| token                | String  | 是      | -                            | 
用户:密码                                             |
-| database             | String  | 否       | -                            | 
将数据写入哪个数据库,默认为源数据库。 |
-| schema_save_mode     | enum    | 否       | CREATE_SCHEMA_WHEN_NOT_EXIST | 
当表不存在时自动创建表。                   |
-| enable_auto_id       | boolean | 否       | false                        | 
主键列启用autoId。                         |
-| enable_upsert        | boolean | 否       | false                        | 
是否启用upsert。                                   |
-| enable_dynamic_field | boolean | 否       | true                         | 
是否启用带动态字段的创建表。                   |
-| batch_size           | int     | 否       | 1000                         | 
写入批大小。                                         |
-| partition_key        | String  | 否       |                              | 
Milvus分区键字段                                |                                    
     
+| 名字                     | 类型                  | 是否必传 | 默认值                    
      | 描述                                                                  |
+|------------------------|---------------------|------|------------------------------|---------------------------------------------------------------------|
+| url                    | String              | 是    | -                      
      | 连接到Milvus或Zilliz Cloud的URL。                                         |
+| token                  | String              | 是    | -                      
      | 用户:密码                                                               |
+| database               | String              | 否    | -                      
      | 将数据写入哪个数据库,默认为源数据库。                                                 |
+| schema_save_mode       | enum                | 否    | 
CREATE_SCHEMA_WHEN_NOT_EXIST | 当表不存在时自动创建表。                                     
                   |
+| enable_auto_id         | boolean             | 否    | false                  
      | 主键列启用autoId。                                                        |
+| enable_upsert          | boolean             | 否    | false                  
      | 是否启用upsert。                                                         |
+| enable_dynamic_field   | boolean             | 否    | true                   
      | 是否启用带动态字段的创建表。                                                      |
+| batch_size             | int                 | 否    | 1000                   
      | 写入批大小。当缓冲记录数达到 `batch_size` 或时间达到 `checkpoint.interval` 时,将触发一次写入刷新 |
+| partition_key          | String              | 否    |                        
      | Milvus分区键字段                                                         |   
                                      
+| create_index           | boolean             | No   | false                  
      | 自动为集合创建向量索引以提高查询性能                                                  |
+| load_collection        | boolean             | No   | false                  
      | 将集合加载到 Milvus 内存中以便立即进行查询                                           |
+| collection_description | Map<String, String> | No   | {}                     
      | 集合描述映射,其中键是集合名称,值是描述                                                |   
                                      
 
 ## 任务示例
 
+### 基础配置
 ```bash
 sink {
   Milvus {
@@ -63,6 +67,23 @@ sink {
 }
 ```
 
+### 带 Index 和 Loading 的高级配置
+```bash
+sink {
+  Milvus {
+    url = "http://127.0.0.1:19530";
+    token = "username:password"
+    batch_size = 1000
+    create_index = true
+    load_collection = true
+    collection_description = {
+      "user_vectors" = "User embedding vectors for recommendation"
+      "product_vectors" = "Product feature vectors for search"
+    }
+  }
+}
+```
+
 ## 变更日志
 
 <ChangeLog />
\ No newline at end of file
diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkOptions.java
 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkOptions.java
index 1c7d00f3a9..4113be01b8 100644
--- 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkOptions.java
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkOptions.java
@@ -82,6 +82,7 @@ public class MilvusSinkOptions extends MilvusBaseOptions {
                     .intType()
                     .defaultValue(1000)
                     .withDescription("writer batch size");
+
     public static final Option<Integer> RATE_LIMIT =
             Options.key("rate_limit")
                     .intType()
diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java
 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java
index 98b2b46c3b..55402896ac 100644
--- 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java
@@ -61,13 +61,7 @@ public class MilvusSinkWriter
     public void write(SeaTunnelRow element) {
         batchWriter.addToBatch(element);
         if (batchWriter.needFlush()) {
-            try {
-                // Flush the batch writer
-                batchWriter.flush();
-            } catch (Exception e) {
-                log.error("flush Milvus sink writer failed", e);
-                throw new 
MilvusConnectorException(MilvusConnectionErrorCode.WRITE_DATA_FAIL, e);
-            }
+            flush();
         }
     }
 
@@ -81,6 +75,7 @@ public class MilvusSinkWriter
      */
     @Override
     public Optional<MilvusCommitInfo> prepareCommit() throws IOException {
+        flush();
         return Optional.empty();
     }
 
@@ -110,4 +105,14 @@ public class MilvusSinkWriter
             throw new 
MilvusConnectorException(MilvusConnectionErrorCode.CLOSE_CLIENT_ERROR, e);
         }
     }
+
+    private void flush() {
+        try {
+            // Flush the batch writer
+            batchWriter.flush();
+        } catch (Exception e) {
+            log.error("flush Milvus sink writer failed", e);
+            throw new 
MilvusConnectorException(MilvusConnectionErrorCode.WRITE_DATA_FAIL, e);
+        }
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
index 3aed4f1455..1ad1fc9882 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
@@ -58,6 +58,7 @@ import io.milvus.grpc.FieldSchema;
 import io.milvus.grpc.IndexDescription;
 import io.milvus.grpc.KeyValuePair;
 import io.milvus.grpc.MutationResult;
+import io.milvus.grpc.QueryResults;
 import io.milvus.param.ConnectParam;
 import io.milvus.param.IndexType;
 import io.milvus.param.MetricType;
@@ -69,6 +70,7 @@ import io.milvus.param.collection.FieldType;
 import io.milvus.param.collection.HasCollectionParam;
 import io.milvus.param.collection.LoadCollectionParam;
 import io.milvus.param.dml.InsertParam;
+import io.milvus.param.dml.QueryParam;
 import io.milvus.param.index.CreateIndexParam;
 import io.milvus.param.index.DescribeIndexParam;
 import lombok.extern.slf4j.Slf4j;
@@ -82,6 +84,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -715,4 +719,114 @@ public class MilvusIT extends TestSuiteBase implements 
TestResource {
 
         log.info("Index verification passed for collection: {}.{}", database, 
collection);
     }
+
+    @TestTemplate
+    public void testStreamingFakeToMilvus(TestContainer container)
+            throws IOException, InterruptedException {
+        // flush by checkpoint interval
+        String jobId = "1";
+        String database = "streaming_test";
+        String collection = "streaming_simple_example";
+        String vectorField = "book_intro";
+        int checkpointInterval = 30000;
+        CompletableFuture.runAsync(
+                () -> {
+                    try {
+                        container.executeJob(
+                                "/streaming-fake-to-milvus.conf",
+                                jobId,
+                                "database=" + database,
+                                "collection=" + collection,
+                                "batch_size=3");
+                    } catch (IOException | InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+
+        // count write records
+        waitCollectionReady(database, collection, vectorField);
+        Awaitility.await()
+                .atMost(60, TimeUnit.SECONDS)
+                .pollInterval(2, TimeUnit.SECONDS)
+                .until(() -> countCollectionEntities(database, collection) >= 
9);
+        Assertions.assertEquals(9, countCollectionEntities(database, 
collection));
+        TimeUnit.MILLISECONDS.sleep(checkpointInterval);
+        Assertions.assertEquals(10, countCollectionEntities(database, 
collection));
+
+        // cancel jobs
+        container.cancelJob(jobId);
+    }
+
+    private void waitCollectionReady(
+            String databaseName, String collectionName, String 
vectorFieldName) {
+        // assert table exist
+        Awaitility.await()
+                .atMost(60, TimeUnit.SECONDS)
+                .pollInterval(2, TimeUnit.SECONDS)
+                .until(
+                        () -> {
+                            R<Boolean> hasCollectionResponse =
+                                    this.milvusClient.hasCollection(
+                                            HasCollectionParam.newBuilder()
+                                                    
.withDatabaseName(databaseName)
+                                                    
.withCollectionName(collectionName)
+                                                    .build());
+                            Assertions.assertEquals(
+                                    R.Status.Success.getCode(),
+                                    hasCollectionResponse.getStatus(),
+                                    
Optional.ofNullable(hasCollectionResponse.getException())
+                                            .map(Exception::getMessage)
+                                            .orElse(""));
+                            return hasCollectionResponse.getData();
+                        });
+
+        // create index
+        R<RpcStatus> createIndexResponse =
+                milvusClient.createIndex(
+                        CreateIndexParam.newBuilder()
+                                .withDatabaseName(databaseName)
+                                .withCollectionName(collectionName)
+                                .withFieldName(vectorFieldName)
+                                .withIndexType(IndexType.FLAT)
+                                .withMetricType(MetricType.L2)
+                                .build());
+        Assertions.assertEquals(
+                R.Status.Success.getCode(),
+                createIndexResponse.getStatus(),
+                Optional.ofNullable(createIndexResponse.getException())
+                        .map(Exception::getMessage)
+                        .orElse(""));
+
+        // load collection
+        R<RpcStatus> loadCollectionResponse =
+                milvusClient.loadCollection(
+                        LoadCollectionParam.newBuilder()
+                                .withDatabaseName(databaseName)
+                                .withCollectionName(collectionName)
+                                .build());
+        Assertions.assertEquals(
+                R.Status.Success.getCode(),
+                loadCollectionResponse.getStatus(),
+                Optional.ofNullable(loadCollectionResponse.getException())
+                        .map(Exception::getMessage)
+                        .orElse(""));
+    }
+
+    private long countCollectionEntities(String databaseName, String 
collectionName) {
+        R<QueryResults> queryResults =
+                milvusClient.query(
+                        QueryParam.newBuilder()
+                                .withDatabaseName(databaseName)
+                                .withCollectionName(collectionName)
+                                
.withOutFields(Collections.singletonList("count(*)"))
+                                .build());
+        Assertions.assertEquals(R.Status.Success.getCode(), 
queryResults.getStatus());
+        return queryResults
+                .getData()
+                .getFieldsData(0)
+                .getScalars()
+                .getLongData()
+                .getDataList()
+                .get(0);
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/streaming-fake-to-milvus.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/streaming-fake-to-milvus.conf
new file mode 100644
index 0000000000..4f7a8b9ab2
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/streaming-fake-to-milvus.conf
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 30000
+}
+
+source {
+  FakeSource {
+      row.num = 10
+      vector.dimension= 4
+      schema = {
+           table = ${collection}
+           columns = [
+           {
+              name = book_id
+              type = bigint
+              nullable = false
+              defaultValue = 0
+              comment = "primary key id"
+           },
+           {
+              name = book_intro
+              type = float_vector
+              columnScale =4
+              comment = "vector"
+           },
+           {
+              name = book_title
+              type = string
+              nullable = true
+              comment = "topic"
+           }
+       ]
+        primaryKey {
+            name = book_id
+            columnNames = [book_id]
+        }
+      }
+  }
+}
+
+sink {
+   Milvus {
+     url = "http://milvus-e2e:19530";
+     token = "root:Milvus"
+     database = ${database}
+     enable_upsert = false
+     batch_size = ${batch_size}
+   }
+}
\ No newline at end of file

Reply via email to