This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new bde235090b [Fix][Connector-V2] Fix load state check in
MilvusSourceReader to consider partition-level status (#8937)
bde235090b is described below
commit bde235090b1758b2833d2570451e521acfee9483
Author: xiaochen <[email protected]>
AuthorDate: Wed Mar 19 21:55:49 2025 +0800
[Fix][Connector-V2] Fix load state check in MilvusSourceReader to consider
partition-level status (#8937)
---
.../seatunnel/milvus/source/MilvusSource.java | 10 +-
.../milvus/source/MilvusSourceReader.java | 16 ++-
...ertor.java => MilvusSourceSplitEnumerator.java} | 4 +-
.../e2e/connector/v2/milvus/MilvusIT.java | 156 ++++++++++++++++++++-
.../milvus-to-milvus-with-partitionkey.conf | 38 +++++
5 files changed, 210 insertions(+), 14 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSource.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSource.java
index abb7e9c898..17112d0b75 100644
---
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSource.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSource.java
@@ -42,9 +42,9 @@ public class MilvusSource
private final ReadonlyConfig config;
private final Map<TablePath, CatalogTable> sourceTables;
- public MilvusSource(ReadonlyConfig sourceConfing) {
- this.config = sourceConfing;
- MilvusConvertUtils milvusConvertUtils = new
MilvusConvertUtils(sourceConfing);
+ public MilvusSource(ReadonlyConfig sourceConfig) {
+ this.config = sourceConfig;
+ MilvusConvertUtils milvusConvertUtils = new
MilvusConvertUtils(sourceConfig);
this.sourceTables = milvusConvertUtils.getSourceTables();
}
@@ -66,7 +66,7 @@ public class MilvusSource
@Override
public SourceSplitEnumerator<MilvusSourceSplit, MilvusSourceState>
createEnumerator(
SourceSplitEnumerator.Context<MilvusSourceSplit> context) throws
Exception {
- return new MilvusSourceSplitEnumertor(context, config, sourceTables,
null);
+ return new MilvusSourceSplitEnumerator(context, config, sourceTables,
null);
}
@Override
@@ -74,7 +74,7 @@ public class MilvusSource
SourceSplitEnumerator.Context<MilvusSourceSplit> context,
MilvusSourceState checkpointState)
throws Exception {
- return new MilvusSourceSplitEnumertor(context, config, sourceTables,
checkpointState);
+ return new MilvusSourceSplitEnumerator(context, config, sourceTables,
checkpointState);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java
index cd8b026124..316aa51488 100644
---
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java
@@ -162,12 +162,16 @@ public class MilvusSourceReader implements
SourceReader<SeaTunnelRow, MilvusSour
MilvusConnectionErrorCode.SOURCE_TABLE_SCHEMA_IS_NULL);
}
- R<GetLoadStateResponse> loadStateResponse =
- client.getLoadState(
- GetLoadStateParam.newBuilder()
- .withDatabaseName(tablePath.getDatabaseName())
- .withCollectionName(tablePath.getTableName())
- .build());
+ GetLoadStateParam.Builder loadStateParam =
+ GetLoadStateParam.newBuilder()
+ .withDatabaseName(tablePath.getDatabaseName())
+ .withCollectionName(tablePath.getTableName());
+
+ if (StringUtils.isNotEmpty(partitionName)) {
+
loadStateParam.withPartitionNames(Collections.singletonList(partitionName));
+ }
+
+ R<GetLoadStateResponse> loadStateResponse =
client.getLoadState(loadStateParam.build());
if (loadStateResponse.getStatus() != R.Status.Success.getCode()) {
throw new MilvusConnectorException(
MilvusConnectionErrorCode.SERVER_RESPONSE_FAILED,
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumertor.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumerator.java
similarity index 99%
rename from
seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumertor.java
rename to
seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumerator.java
index 1c181baffc..e415b0cfff 100644
---
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumertor.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumerator.java
@@ -48,7 +48,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
@Slf4j
-public class MilvusSourceSplitEnumertor
+public class MilvusSourceSplitEnumerator
implements SourceSplitEnumerator<MilvusSourceSplit, MilvusSourceState>
{
private final Map<TablePath, CatalogTable> tables;
@@ -60,7 +60,7 @@ public class MilvusSourceSplitEnumertor
private final ReadonlyConfig config;
- public MilvusSourceSplitEnumertor(
+ public MilvusSourceSplitEnumerator(
Context<MilvusSourceSplit> context,
ReadonlyConfig config,
Map<TablePath, CatalogTable> sourceTables,
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
index 55a70e3a4f..1e04e9266a 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
@@ -97,6 +97,8 @@ public class MilvusIT extends TestSuiteBase implements
TestResource {
private static final String COLLECTION_NAME = "simple_example";
private static final String COLLECTION_NAME_1 = "simple_example_1";
private static final String COLLECTION_NAME_2 = "simple_example_2";
+ private static final String COLLECTION_NAME_WITH_PARTITIONKEY =
+ "simple_example_with_partitionkey";
private static final String ID_FIELD = "book_id";
private static final String VECTOR_FIELD = "book_intro";
private static final String VECTOR_FIELD2 = "book_kind";
@@ -243,6 +245,112 @@ public class MilvusIT extends TestSuiteBase implements
TestResource {
log.info("Collection created");
+ // Define fields With Partition Key
+ List<FieldType> fieldsSchemaWithPartitionKey =
+ Arrays.asList(
+ FieldType.newBuilder()
+ .withName(ID_FIELD)
+ .withDataType(DataType.Int64)
+ .withPrimaryKey(true)
+ .withAutoID(false)
+ .build(),
+ FieldType.newBuilder()
+ .withName(VECTOR_FIELD)
+ .withDataType(DataType.FloatVector)
+ .withDimension(VECTOR_DIM)
+ .build(),
+ FieldType.newBuilder()
+ .withName(VECTOR_FIELD2)
+ .withDataType(DataType.Float16Vector)
+ .withDimension(VECTOR_DIM)
+ .build(),
+ FieldType.newBuilder()
+ .withName(VECTOR_FIELD3)
+ .withDataType(DataType.BinaryVector)
+ .withDimension(VECTOR_DIM * 2)
+ .build(),
+ FieldType.newBuilder()
+ .withName(VECTOR_FIELD4)
+ .withDataType(DataType.SparseFloatVector)
+ .build(),
+ FieldType.newBuilder()
+ .withName(TITLE_FIELD)
+ .withDataType(DataType.VarChar)
+ .withPartitionKey(true)
+ .withMaxLength(64)
+ .build());
+
+ // Create the collection with 3 fields
+ R<RpcStatus> ret2 =
+ milvusClient.createCollection(
+ CreateCollectionParam.newBuilder()
+
.withCollectionName(COLLECTION_NAME_WITH_PARTITIONKEY)
+ .withFieldTypes(fieldsSchemaWithPartitionKey)
+ .build());
+ if (ret2.getStatus() != R.Status.Success.getCode()) {
+ throw new RuntimeException("Failed to create collection! Error: "
+ ret.getMessage());
+ }
+
+ // Specify an index type on the vector field.
+ ret2 =
+ milvusClient.createIndex(
+ CreateIndexParam.newBuilder()
+
.withCollectionName(COLLECTION_NAME_WITH_PARTITIONKEY)
+ .withFieldName(VECTOR_FIELD)
+ .withIndexType(IndexType.FLAT)
+ .withMetricType(MetricType.L2)
+ .build());
+ if (ret2.getStatus() != R.Status.Success.getCode()) {
+ throw new RuntimeException(
+ "Failed to create index on vector field! Error: " +
ret.getMessage());
+ }
+
+ ret2 =
+ milvusClient.createIndex(
+ CreateIndexParam.newBuilder()
+
.withCollectionName(COLLECTION_NAME_WITH_PARTITIONKEY)
+ .withFieldName(VECTOR_FIELD2)
+ .withIndexType(IndexType.FLAT)
+ .withMetricType(MetricType.L2)
+ .build());
+ if (ret2.getStatus() != R.Status.Success.getCode()) {
+ throw new RuntimeException(
+ "Failed to create index on vector field! Error: " +
ret.getMessage());
+ }
+ ret2 =
+ milvusClient.createIndex(
+ CreateIndexParam.newBuilder()
+
.withCollectionName(COLLECTION_NAME_WITH_PARTITIONKEY)
+ .withFieldName(VECTOR_FIELD3)
+ .withIndexType(IndexType.BIN_FLAT)
+ .withMetricType(MetricType.HAMMING)
+ .build());
+ if (ret2.getStatus() != R.Status.Success.getCode()) {
+ throw new RuntimeException(
+ "Failed to create index on vector field! Error: " +
ret.getMessage());
+ }
+
+ ret2 =
+ milvusClient.createIndex(
+ CreateIndexParam.newBuilder()
+
.withCollectionName(COLLECTION_NAME_WITH_PARTITIONKEY)
+ .withFieldName(VECTOR_FIELD4)
+ .withIndexType(IndexType.SPARSE_INVERTED_INDEX)
+ .withMetricType(MetricType.IP)
+ .build());
+ if (ret2.getStatus() != R.Status.Success.getCode()) {
+ throw new RuntimeException(
+ "Failed to create index on vector field! Error: " +
ret.getMessage());
+ }
+
+ // Call loadCollection() to enable automatically loading data into
memory for searching
+ milvusClient.loadCollection(
+ LoadCollectionParam.newBuilder()
+ .withCollectionName(COLLECTION_NAME_WITH_PARTITIONKEY)
+ .build());
+
+ log.info("Collection created");
+
// Insert 10 records into the collection
List<JsonObject> rows = new ArrayList<>();
for (long i = 1L; i <= 10; ++i) {
@@ -272,7 +380,16 @@ public class MilvusIT extends TestSuiteBase implements
TestResource {
.withCollectionName(COLLECTION_NAME)
.withRows(rows)
.build());
- if (insertRet.getStatus() != R.Status.Success.getCode()) {
+
+ R<MutationResult> insertRet2 =
+ milvusClient.insert(
+ InsertParam.newBuilder()
+
.withCollectionName(COLLECTION_NAME_WITH_PARTITIONKEY)
+ .withRows(rows)
+ .build());
+
+ if (insertRet.getStatus() != R.Status.Success.getCode()
+ || insertRet2.getStatus() != R.Status.Success.getCode()) {
throw new RuntimeException("Failed to insert! Error: " +
insertRet.getMessage());
}
}
@@ -322,6 +439,43 @@ public class MilvusIT extends TestSuiteBase implements
TestResource {
Assertions.assertTrue(fileds.contains(TITLE_FIELD));
}
+ @TestTemplate
+ public void testMilvusWithPartitionKey(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+
container.executeJob("/milvus-to-milvus-with-partitionkey.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ // assert table exist
+ R<Boolean> hasCollectionResponse =
+ this.milvusClient.hasCollection(
+ HasCollectionParam.newBuilder()
+ .withDatabaseName("test")
+
.withCollectionName(COLLECTION_NAME_WITH_PARTITIONKEY)
+ .build());
+ Assertions.assertTrue(hasCollectionResponse.getData());
+
+ // check table fields
+ R<DescribeCollectionResponse> describeCollectionResponseR =
+ this.milvusClient.describeCollection(
+ DescribeCollectionParam.newBuilder()
+ .withDatabaseName("test")
+
.withCollectionName(COLLECTION_NAME_WITH_PARTITIONKEY)
+ .build());
+
+ DescribeCollectionResponse data =
describeCollectionResponseR.getData();
+ List<String> fileds =
+ data.getSchema().getFieldsList().stream()
+ .map(FieldSchema::getName)
+ .collect(Collectors.toList());
+ Assertions.assertTrue(fileds.contains(ID_FIELD));
+ Assertions.assertTrue(fileds.contains(VECTOR_FIELD));
+ Assertions.assertTrue(fileds.contains(VECTOR_FIELD2));
+ Assertions.assertTrue(fileds.contains(VECTOR_FIELD3));
+ Assertions.assertTrue(fileds.contains(VECTOR_FIELD4));
+ Assertions.assertTrue(fileds.contains(TITLE_FIELD));
+ }
+
@TestTemplate
public void testFakeToMilvus(TestContainer container) throws IOException,
InterruptedException {
Container.ExecResult execResult =
container.executeJob("/fake-to-milvus.conf");
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/milvus-to-milvus-with-partitionkey.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/milvus-to-milvus-with-partitionkey.conf
new file mode 100644
index 0000000000..2c1c45af8e
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/milvus-to-milvus-with-partitionkey.conf
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Milvus {
+ url = "http://milvus-e2e:19530"
+ token = "root:Milvus"
+ collection = "simple_example_with_partitionkey"
+ }
+}
+
+sink {
+ Milvus {
+ url = "http://milvus-e2e:19530"
+ token = "root:Milvus"
+ database="test"
+ collection = "simple_example_with_partitionkey"
+ }
+}
\ No newline at end of file