This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new bde235090b [Fix][Connector-V2] Fix load state check in 
MilvusSourceReader to consider partition-level status (#8937)
bde235090b is described below

commit bde235090b1758b2833d2570451e521acfee9483
Author: xiaochen <[email protected]>
AuthorDate: Wed Mar 19 21:55:49 2025 +0800

    [Fix][Connector-V2] Fix load state check in MilvusSourceReader to consider 
partition-level status (#8937)
---
 .../seatunnel/milvus/source/MilvusSource.java      |  10 +-
 .../milvus/source/MilvusSourceReader.java          |  16 ++-
 ...ertor.java => MilvusSourceSplitEnumerator.java} |   4 +-
 .../e2e/connector/v2/milvus/MilvusIT.java          | 156 ++++++++++++++++++++-
 .../milvus-to-milvus-with-partitionkey.conf        |  38 +++++
 5 files changed, 210 insertions(+), 14 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSource.java
 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSource.java
index abb7e9c898..17112d0b75 100644
--- 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSource.java
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSource.java
@@ -42,9 +42,9 @@ public class MilvusSource
     private final ReadonlyConfig config;
     private final Map<TablePath, CatalogTable> sourceTables;
 
-    public MilvusSource(ReadonlyConfig sourceConfing) {
-        this.config = sourceConfing;
-        MilvusConvertUtils milvusConvertUtils = new 
MilvusConvertUtils(sourceConfing);
+    public MilvusSource(ReadonlyConfig sourceConfig) {
+        this.config = sourceConfig;
+        MilvusConvertUtils milvusConvertUtils = new 
MilvusConvertUtils(sourceConfig);
         this.sourceTables = milvusConvertUtils.getSourceTables();
     }
 
@@ -66,7 +66,7 @@ public class MilvusSource
     @Override
     public SourceSplitEnumerator<MilvusSourceSplit, MilvusSourceState> 
createEnumerator(
             SourceSplitEnumerator.Context<MilvusSourceSplit> context) throws 
Exception {
-        return new MilvusSourceSplitEnumertor(context, config, sourceTables, 
null);
+        return new MilvusSourceSplitEnumerator(context, config, sourceTables, 
null);
     }
 
     @Override
@@ -74,7 +74,7 @@ public class MilvusSource
             SourceSplitEnumerator.Context<MilvusSourceSplit> context,
             MilvusSourceState checkpointState)
             throws Exception {
-        return new MilvusSourceSplitEnumertor(context, config, sourceTables, 
checkpointState);
+        return new MilvusSourceSplitEnumerator(context, config, sourceTables, 
checkpointState);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java
 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java
index cd8b026124..316aa51488 100644
--- 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java
@@ -162,12 +162,16 @@ public class MilvusSourceReader implements 
SourceReader<SeaTunnelRow, MilvusSour
                     MilvusConnectionErrorCode.SOURCE_TABLE_SCHEMA_IS_NULL);
         }
 
-        R<GetLoadStateResponse> loadStateResponse =
-                client.getLoadState(
-                        GetLoadStateParam.newBuilder()
-                                .withDatabaseName(tablePath.getDatabaseName())
-                                .withCollectionName(tablePath.getTableName())
-                                .build());
+        GetLoadStateParam.Builder loadStateParam =
+                GetLoadStateParam.newBuilder()
+                        .withDatabaseName(tablePath.getDatabaseName())
+                        .withCollectionName(tablePath.getTableName());
+
+        if (StringUtils.isNotEmpty(partitionName)) {
+            
loadStateParam.withPartitionNames(Collections.singletonList(partitionName));
+        }
+
+        R<GetLoadStateResponse> loadStateResponse = 
client.getLoadState(loadStateParam.build());
         if (loadStateResponse.getStatus() != R.Status.Success.getCode()) {
             throw new MilvusConnectorException(
                     MilvusConnectionErrorCode.SERVER_RESPONSE_FAILED,
diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumertor.java
 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumerator.java
similarity index 99%
rename from 
seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumertor.java
rename to 
seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumerator.java
index 1c181baffc..e415b0cfff 100644
--- 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumertor.java
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumerator.java
@@ -48,7 +48,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 @Slf4j
-public class MilvusSourceSplitEnumertor
+public class MilvusSourceSplitEnumerator
         implements SourceSplitEnumerator<MilvusSourceSplit, MilvusSourceState> 
{
 
     private final Map<TablePath, CatalogTable> tables;
@@ -60,7 +60,7 @@ public class MilvusSourceSplitEnumertor
 
     private final ReadonlyConfig config;
 
-    public MilvusSourceSplitEnumertor(
+    public MilvusSourceSplitEnumerator(
             Context<MilvusSourceSplit> context,
             ReadonlyConfig config,
             Map<TablePath, CatalogTable> sourceTables,
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
index 55a70e3a4f..1e04e9266a 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
@@ -97,6 +97,8 @@ public class MilvusIT extends TestSuiteBase implements 
TestResource {
     private static final String COLLECTION_NAME = "simple_example";
     private static final String COLLECTION_NAME_1 = "simple_example_1";
     private static final String COLLECTION_NAME_2 = "simple_example_2";
+    private static final String COLLECTION_NAME_WITH_PARTITIONKEY =
+            "simple_example_with_partitionkey";
     private static final String ID_FIELD = "book_id";
     private static final String VECTOR_FIELD = "book_intro";
     private static final String VECTOR_FIELD2 = "book_kind";
@@ -243,6 +245,112 @@ public class MilvusIT extends TestSuiteBase implements 
TestResource {
 
         log.info("Collection created");
 
+        // Define fields With Partition Key
+        List<FieldType> fieldsSchemaWithPartitionKey =
+                Arrays.asList(
+                        FieldType.newBuilder()
+                                .withName(ID_FIELD)
+                                .withDataType(DataType.Int64)
+                                .withPrimaryKey(true)
+                                .withAutoID(false)
+                                .build(),
+                        FieldType.newBuilder()
+                                .withName(VECTOR_FIELD)
+                                .withDataType(DataType.FloatVector)
+                                .withDimension(VECTOR_DIM)
+                                .build(),
+                        FieldType.newBuilder()
+                                .withName(VECTOR_FIELD2)
+                                .withDataType(DataType.Float16Vector)
+                                .withDimension(VECTOR_DIM)
+                                .build(),
+                        FieldType.newBuilder()
+                                .withName(VECTOR_FIELD3)
+                                .withDataType(DataType.BinaryVector)
+                                .withDimension(VECTOR_DIM * 2)
+                                .build(),
+                        FieldType.newBuilder()
+                                .withName(VECTOR_FIELD4)
+                                .withDataType(DataType.SparseFloatVector)
+                                .build(),
+                        FieldType.newBuilder()
+                                .withName(TITLE_FIELD)
+                                .withDataType(DataType.VarChar)
+                                .withPartitionKey(true)
+                                .withMaxLength(64)
+                                .build());
+
+        // Create the collection with 3 fields
+        R<RpcStatus> ret2 =
+                milvusClient.createCollection(
+                        CreateCollectionParam.newBuilder()
+                                
.withCollectionName(COLLECTION_NAME_WITH_PARTITIONKEY)
+                                .withFieldTypes(fieldsSchemaWithPartitionKey)
+                                .build());
+        if (ret2.getStatus() != R.Status.Success.getCode()) {
+            throw new RuntimeException("Failed to create collection! Error: " 
+ ret.getMessage());
+        }
+
+        // Specify an index type on the vector field.
+        ret2 =
+                milvusClient.createIndex(
+                        CreateIndexParam.newBuilder()
+                                
.withCollectionName(COLLECTION_NAME_WITH_PARTITIONKEY)
+                                .withFieldName(VECTOR_FIELD)
+                                .withIndexType(IndexType.FLAT)
+                                .withMetricType(MetricType.L2)
+                                .build());
+        if (ret2.getStatus() != R.Status.Success.getCode()) {
+            throw new RuntimeException(
+                    "Failed to create index on vector field! Error: " + 
ret.getMessage());
+        }
+
+        ret2 =
+                milvusClient.createIndex(
+                        CreateIndexParam.newBuilder()
+                                
.withCollectionName(COLLECTION_NAME_WITH_PARTITIONKEY)
+                                .withFieldName(VECTOR_FIELD2)
+                                .withIndexType(IndexType.FLAT)
+                                .withMetricType(MetricType.L2)
+                                .build());
+        if (ret2.getStatus() != R.Status.Success.getCode()) {
+            throw new RuntimeException(
+                    "Failed to create index on vector field! Error: " + 
ret.getMessage());
+        }
+        ret2 =
+                milvusClient.createIndex(
+                        CreateIndexParam.newBuilder()
+                                
.withCollectionName(COLLECTION_NAME_WITH_PARTITIONKEY)
+                                .withFieldName(VECTOR_FIELD3)
+                                .withIndexType(IndexType.BIN_FLAT)
+                                .withMetricType(MetricType.HAMMING)
+                                .build());
+        if (ret2.getStatus() != R.Status.Success.getCode()) {
+            throw new RuntimeException(
+                    "Failed to create index on vector field! Error: " + 
ret.getMessage());
+        }
+
+        ret2 =
+                milvusClient.createIndex(
+                        CreateIndexParam.newBuilder()
+                                
.withCollectionName(COLLECTION_NAME_WITH_PARTITIONKEY)
+                                .withFieldName(VECTOR_FIELD4)
+                                .withIndexType(IndexType.SPARSE_INVERTED_INDEX)
+                                .withMetricType(MetricType.IP)
+                                .build());
+        if (ret2.getStatus() != R.Status.Success.getCode()) {
+            throw new RuntimeException(
+                    "Failed to create index on vector field! Error: " + 
ret.getMessage());
+        }
+
+        // Call loadCollection() to enable automatically loading data into 
memory for searching
+        milvusClient.loadCollection(
+                LoadCollectionParam.newBuilder()
+                        .withCollectionName(COLLECTION_NAME_WITH_PARTITIONKEY)
+                        .build());
+
+        log.info("Collection created");
+
         // Insert 10 records into the collection
         List<JsonObject> rows = new ArrayList<>();
         for (long i = 1L; i <= 10; ++i) {
@@ -272,7 +380,16 @@ public class MilvusIT extends TestSuiteBase implements 
TestResource {
                                 .withCollectionName(COLLECTION_NAME)
                                 .withRows(rows)
                                 .build());
-        if (insertRet.getStatus() != R.Status.Success.getCode()) {
+
+        R<MutationResult> insertRet2 =
+                milvusClient.insert(
+                        InsertParam.newBuilder()
+                                
.withCollectionName(COLLECTION_NAME_WITH_PARTITIONKEY)
+                                .withRows(rows)
+                                .build());
+
+        if (insertRet.getStatus() != R.Status.Success.getCode()
+                || insertRet2.getStatus() != R.Status.Success.getCode()) {
             throw new RuntimeException("Failed to insert! Error: " + 
insertRet.getMessage());
         }
     }
@@ -322,6 +439,43 @@ public class MilvusIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertTrue(fileds.contains(TITLE_FIELD));
     }
 
+    @TestTemplate
+    public void testMilvusWithPartitionKey(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                
container.executeJob("/milvus-to-milvus-with-partitionkey.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        // assert table exist
+        R<Boolean> hasCollectionResponse =
+                this.milvusClient.hasCollection(
+                        HasCollectionParam.newBuilder()
+                                .withDatabaseName("test")
+                                
.withCollectionName(COLLECTION_NAME_WITH_PARTITIONKEY)
+                                .build());
+        Assertions.assertTrue(hasCollectionResponse.getData());
+
+        // check table fields
+        R<DescribeCollectionResponse> describeCollectionResponseR =
+                this.milvusClient.describeCollection(
+                        DescribeCollectionParam.newBuilder()
+                                .withDatabaseName("test")
+                                
.withCollectionName(COLLECTION_NAME_WITH_PARTITIONKEY)
+                                .build());
+
+        DescribeCollectionResponse data = 
describeCollectionResponseR.getData();
+        List<String> fileds =
+                data.getSchema().getFieldsList().stream()
+                        .map(FieldSchema::getName)
+                        .collect(Collectors.toList());
+        Assertions.assertTrue(fileds.contains(ID_FIELD));
+        Assertions.assertTrue(fileds.contains(VECTOR_FIELD));
+        Assertions.assertTrue(fileds.contains(VECTOR_FIELD2));
+        Assertions.assertTrue(fileds.contains(VECTOR_FIELD3));
+        Assertions.assertTrue(fileds.contains(VECTOR_FIELD4));
+        Assertions.assertTrue(fileds.contains(TITLE_FIELD));
+    }
+
     @TestTemplate
     public void testFakeToMilvus(TestContainer container) throws IOException, 
InterruptedException {
         Container.ExecResult execResult = 
container.executeJob("/fake-to-milvus.conf");
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/milvus-to-milvus-with-partitionkey.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/milvus-to-milvus-with-partitionkey.conf
new file mode 100644
index 0000000000..2c1c45af8e
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/milvus-to-milvus-with-partitionkey.conf
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Milvus {
+    url = "http://milvus-e2e:19530";
+    token = "root:Milvus"
+    collection = "simple_example_with_partitionkey"
+  }
+}
+
+sink {
+   Milvus {
+     url = "http://milvus-e2e:19530";
+     token = "root:Milvus"
+     database="test"
+     collection = "simple_example_with_partitionkey"
+   }
+}
\ No newline at end of file

Reply via email to