This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 582f9c7184 [Fix][Connect-V2][Milvus] Fix partition creation (#10589)
582f9c7184 is described below
commit 582f9c7184123644a54710b0f1e4a4f1cbef06a5
Author: 许晓峰 <[email protected]>
AuthorDate: Tue Mar 17 21:41:45 2026 +0800
[Fix][Connect-V2][Milvus] Fix partition creation (#10589)
---
.../seatunnel/milvus/catalog/MilvusCatalog.java | 49 ++++++-
.../seatunnel/milvus/utils/MilvusConvertUtils.java | 2 +-
.../milvus/catalog/MilvusCatalogTest.java | 150 +++++++++++++++++++++
.../milvus/utils/MilvusConvertUtilsTest.java | 119 ++++++++++++++++
.../e2e/connector/v2/milvus/MilvusIT.java | 128 +++++++++++++++++-
.../milvus-to-milvus-with-partitions.conf | 38 ++++++
6 files changed, 481 insertions(+), 5 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalog.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalog.java
index 68a15e5a5e..6343bf253b 100644
---
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalog.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalog.java
@@ -198,6 +198,10 @@ public class MilvusCatalog implements Catalog {
checkNotNull(catalogTable, "catalogTable must not be null");
TableSchema tableSchema = catalogTable.getTableSchema();
checkNotNull(tableSchema, "tableSchema must not be null");
+ log.info(
+ "Start creating Milvus collection. database={}, collection={}",
+ tablePath.getDatabaseName(),
+ tablePath.getTableName());
createTableInternal(tablePath, catalogTable);
if (CollectionUtils.isNotEmpty(tableSchema.getConstraintKeys())
@@ -206,10 +210,19 @@ public class MilvusCatalog implements Catalog {
if (constraintKey
.getConstraintType()
.equals(ConstraintKey.ConstraintType.VECTOR_INDEX_KEY)) {
+ log.info(
+ "Creating Milvus vector indexes. database={},
collection={}, constraintName={}",
+ tablePath.getDatabaseName(),
+ tablePath.getTableName(),
+ constraintKey.getConstraintName());
createIndexInternal(tablePath,
constraintKey.getColumnNames());
}
}
}
+ log.info(
+ "Finished creating Milvus collection. database={},
collection={}",
+ tablePath.getDatabaseName(),
+ tablePath.getTableName());
}
private void createIndexInternal(
@@ -294,15 +307,26 @@ public class MilvusCatalog implements Catalog {
}
CreateCollectionParam createCollectionParam = builder.build();
+ log.info(
+ "Creating Milvus collection metadata. database={},
collection={}",
+ tablePath.getDatabaseName(),
+ tablePath.getTableName());
R<RpcStatus> response =
this.client.createCollection(createCollectionParam);
if (!Objects.equals(response.getStatus(),
R.success().getStatus())) {
throw new MilvusConnectorException(
MilvusConnectionErrorCode.CREATE_COLLECTION_ERROR,
response.getMessage());
}
- // not exist partition key field, will read show partitions to
create
- if (!existPartitionKeyField &&
options.containsKey(MilvusOptions.PARTITION_KEY_FIELD)) {
-
createPartitionInternal(options.get(MilvusOptions.PARTITION_KEY_FIELD),
tablePath);
+ // When collection does not have a partition key field,
+ // create partitions from the 'partitionNames' option
+ String partitionNames = options.get(MilvusOptions.PARTITION_NAMES);
+ if (!existPartitionKeyField &&
StringUtils.isNotBlank(partitionNames)) {
+ log.info(
+ "Creating Milvus partitions. database={},
collection={}, partitionNames={}",
+ tablePath.getDatabaseName(),
+ tablePath.getTableName(),
+ partitionNames);
+ createPartitionInternal(partitionNames, tablePath);
}
} catch (Exception e) {
@@ -329,9 +353,28 @@ public class MilvusCatalog implements Catalog {
// start to loop create partition
String[] partitionNameArray = partitionNames.split(",");
for (String partitionName : partitionNameArray) {
+ partitionName = partitionName.trim();
+ if (StringUtils.isBlank(partitionName) ||
"_default".equals(partitionName)) {
+ log.info(
+ "Skip Milvus partition creation. database={},
collection={}, partitionName={}",
+ tablePath.getDatabaseName(),
+ tablePath.getTableName(),
+ partitionName);
+ continue;
+ }
if (existPartitionNames.contains(partitionName)) {
+ log.info(
+ "Milvus partition already exists. database={},
collection={}, partitionName={}",
+ tablePath.getDatabaseName(),
+ tablePath.getTableName(),
+ partitionName);
continue;
}
+ log.info(
+ "Creating Milvus partition. database={}, collection={},
partitionName={}",
+ tablePath.getDatabaseName(),
+ tablePath.getTableName(),
+ partitionName);
R<RpcStatus> response =
this.client.createPartition(
CreatePartitionParam.newBuilder()
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConvertUtils.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConvertUtils.java
index 84ba5168e0..9e985e1b66 100644
---
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConvertUtils.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConvertUtils.java
@@ -252,7 +252,7 @@ public class MilvusConvertUtils {
}
list.add(partition);
}
- if (CollectionUtils.isEmpty(partitionNamesList)) {
+ if (CollectionUtils.isEmpty(list)) {
return;
}
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/test/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalogTest.java
b/seatunnel-connectors-v2/connector-milvus/src/test/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalogTest.java
new file mode 100644
index 0000000000..01eb93746f
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-milvus/src/test/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalogTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.milvus.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.milvus.client.MilvusServiceClient;
+import io.milvus.grpc.ShowPartitionsResponse;
+import io.milvus.param.R;
+import io.milvus.param.RpcStatus;
+import io.milvus.param.partition.CreatePartitionParam;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.Collections;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class MilvusCatalogTest {
+
+ @Test
+ void createPartitionInternalSkipsEmptyString() throws Exception {
+ MilvusCatalog catalog =
createCatalogWithClient(mockClientWithDefaultPartitions());
+ invokeCreatePartitionInternal(catalog, "", TablePath.of("db", null,
"coll"));
+ verify(getClient(catalog), never()).createPartition(any());
+ }
+
+ @Test
+ void createPartitionInternalSkipsOnlyCommas() throws Exception {
+ MilvusCatalog catalog =
createCatalogWithClient(mockClientWithDefaultPartitions());
+ invokeCreatePartitionInternal(catalog, ",,,", TablePath.of("db", null,
"coll"));
+ verify(getClient(catalog), never()).createPartition(any());
+ }
+
+ @Test
+ void createPartitionInternalSkipsSpaces() throws Exception {
+ MilvusCatalog catalog =
createCatalogWithClient(mockClientWithDefaultPartitions());
+ invokeCreatePartitionInternal(catalog, " ", TablePath.of("db", null,
"coll"));
+ verify(getClient(catalog), never()).createPartition(any());
+ }
+
+ @Test
+ void createPartitionInternalSkipsDefaultPartitionName() throws Exception {
+ MilvusServiceClient client = mockClientWithDefaultPartitions();
+ R<RpcStatus> successRpcStatusR = mock(R.class);
+
when(successRpcStatusR.getStatus()).thenReturn(R.Status.Success.getCode());
+ when(successRpcStatusR.getMessage()).thenReturn("OK");
+ when(client.createPartition(any()))
+ .thenAnswer(
+ invocation -> {
+ CreatePartitionParam param =
invocation.getArgument(0);
+ String partitionName = extractPartitionName(param);
+ if (partitionName == null
+ || partitionName.trim().isEmpty()
+ || "_default".equals(partitionName)) {
+ throw new RuntimeException(
+ "invalid partitionName: " +
partitionName);
+ }
+ return successRpcStatusR;
+ });
+
+ MilvusCatalog catalog = createCatalogWithClient(client);
+ invokeCreatePartitionInternal(catalog, "_default, p1",
TablePath.of("db", null, "coll"));
+
+ verify(client, times(1)).createPartition(any());
+ }
+
+ private MilvusCatalog createCatalogWithClient(MilvusServiceClient client)
throws Exception {
+ MilvusCatalog catalog =
+ new MilvusCatalog("milvus",
ReadonlyConfig.fromMap(Collections.emptyMap()));
+ Field clientField = MilvusCatalog.class.getDeclaredField("client");
+ clientField.setAccessible(true);
+ clientField.set(catalog, client);
+ return catalog;
+ }
+
+ private MilvusServiceClient mockClientWithDefaultPartitions() {
+ MilvusServiceClient client = mock(MilvusServiceClient.class);
+ @SuppressWarnings("unchecked")
+ R<ShowPartitionsResponse> showPartitionsR = mock(R.class);
+
when(showPartitionsR.getStatus()).thenReturn(R.Status.Success.getCode());
+ when(showPartitionsR.getData())
+ .thenReturn(
+
ShowPartitionsResponse.newBuilder().addPartitionNames("_default").build());
+ when(showPartitionsR.getMessage()).thenReturn("OK");
+ when(client.showPartitions(any())).thenReturn(showPartitionsR);
+
+ @SuppressWarnings("unchecked")
+ R<RpcStatus> createPartitionR = mock(R.class);
+
when(createPartitionR.getStatus()).thenReturn(R.Status.Success.getCode());
+ when(createPartitionR.getMessage()).thenReturn("OK");
+ when(client.createPartition(any())).thenReturn(createPartitionR);
+ return client;
+ }
+
+ private void invokeCreatePartitionInternal(
+ MilvusCatalog catalog, String partitionNames, TablePath tablePath)
throws Exception {
+ Method method =
+ MilvusCatalog.class.getDeclaredMethod(
+ "createPartitionInternal", String.class,
TablePath.class);
+ method.setAccessible(true);
+ Assertions.assertDoesNotThrow(() -> method.invoke(catalog,
partitionNames, tablePath));
+ }
+
+ private MilvusServiceClient getClient(MilvusCatalog catalog) throws
Exception {
+ Field clientField = MilvusCatalog.class.getDeclaredField("client");
+ clientField.setAccessible(true);
+ return (MilvusServiceClient) clientField.get(catalog);
+ }
+
+ private String extractPartitionName(CreatePartitionParam param) {
+ try {
+ Method getter = param.getClass().getMethod("getPartitionName");
+ Object v = getter.invoke(param);
+ return v == null ? null : v.toString();
+ } catch (Exception ignored) {
+ }
+ try {
+ Field f = param.getClass().getDeclaredField("partitionName");
+ f.setAccessible(true);
+ Object v = f.get(param);
+ return v == null ? null : v.toString();
+ } catch (Exception ignored) {
+ }
+ return null;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/test/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConvertUtilsTest.java
b/seatunnel-connectors-v2/connector-milvus/src/test/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConvertUtilsTest.java
new file mode 100644
index 0000000000..ede5cd5349
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-milvus/src/test/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConvertUtilsTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.milvus.utils;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.connectors.seatunnel.milvus.catalog.MilvusOptions;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.milvus.client.MilvusServiceClient;
+import io.milvus.grpc.CollectionSchema;
+import io.milvus.grpc.DataType;
+import io.milvus.grpc.DescribeCollectionResponse;
+import io.milvus.grpc.DescribeIndexResponse;
+import io.milvus.grpc.FieldSchema;
+import io.milvus.grpc.ShowPartitionsResponse;
+import io.milvus.param.R;
+
+import java.util.Collections;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class MilvusConvertUtilsTest {
+
+ @Test
+ void getCatalogTableDoesNotSetPartitionNamesWhenOnlyDefaultPartition() {
+ ReadonlyConfig config = ReadonlyConfig.fromMap(Collections.emptyMap());
+ MilvusConvertUtils utils = new MilvusConvertUtils(config);
+ MilvusServiceClient client = mock(MilvusServiceClient.class);
+
+ mockDescribeCollection(client);
+ mockDescribeIndex(client);
+ mockShowPartitions(
+ client,
ShowPartitionsResponse.newBuilder().addPartitionNames("_default").build());
+
+ CatalogTable table = utils.getCatalogTable(client, "db", "coll");
+
Assertions.assertFalse(table.getOptions().containsKey(MilvusOptions.PARTITION_NAMES));
+ }
+
+ @Test
+ void getCatalogTableSetsPartitionNamesExcludingDefaultPartition() {
+ ReadonlyConfig config = ReadonlyConfig.fromMap(Collections.emptyMap());
+ MilvusConvertUtils utils = new MilvusConvertUtils(config);
+ MilvusServiceClient client = mock(MilvusServiceClient.class);
+
+ mockDescribeCollection(client);
+ mockDescribeIndex(client);
+ mockShowPartitions(
+ client,
+ ShowPartitionsResponse.newBuilder()
+ .addPartitionNames("_default")
+ .addPartitionNames("p1")
+ .addPartitionNames("p2")
+ .build());
+
+ CatalogTable table = utils.getCatalogTable(client, "db", "coll");
+ Assertions.assertEquals("p1,p2",
table.getOptions().get(MilvusOptions.PARTITION_NAMES));
+ }
+
+ private void mockDescribeCollection(MilvusServiceClient client) {
+ FieldSchema idField =
+ FieldSchema.newBuilder()
+ .setName("id")
+ .setDataType(DataType.Int64)
+ .setIsPrimaryKey(true)
+ .build();
+ CollectionSchema schema =
+ CollectionSchema.newBuilder()
+ .addFields(idField)
+ .setEnableDynamicField(false)
+ .setDescription("desc")
+ .build();
+ DescribeCollectionResponse describeCollectionResponse =
+
DescribeCollectionResponse.newBuilder().setSchema(schema).setShardsNum(1).build();
+
+ @SuppressWarnings("unchecked")
+ R<DescribeCollectionResponse> response = mock(R.class);
+ when(response.getStatus()).thenReturn(R.Status.Success.getCode());
+ when(response.getData()).thenReturn(describeCollectionResponse);
+ when(client.describeCollection(any())).thenReturn(response);
+ }
+
+ private void mockDescribeIndex(MilvusServiceClient client) {
+ DescribeIndexResponse describeIndexResponse =
DescribeIndexResponse.newBuilder().build();
+
+ @SuppressWarnings("unchecked")
+ R<DescribeIndexResponse> response = mock(R.class);
+ when(response.getStatus()).thenReturn(R.Status.Success.getCode());
+ when(response.getData()).thenReturn(describeIndexResponse);
+ when(client.describeIndex(any())).thenReturn(response);
+ }
+
+ private void mockShowPartitions(
+ MilvusServiceClient client, ShowPartitionsResponse
showPartitionsResponse) {
+ @SuppressWarnings("unchecked")
+ R<ShowPartitionsResponse> response = mock(R.class);
+ when(response.getStatus()).thenReturn(R.Status.Success.getCode());
+ when(response.getData()).thenReturn(showPartitionsResponse);
+ when(client.showPartitions(any())).thenReturn(response);
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
index 1ad1fc9882..fe78508d7b 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
@@ -73,6 +73,8 @@ import io.milvus.param.dml.InsertParam;
import io.milvus.param.dml.QueryParam;
import io.milvus.param.index.CreateIndexParam;
import io.milvus.param.index.DescribeIndexParam;
+import io.milvus.param.partition.CreatePartitionParam;
+import io.milvus.param.partition.ShowPartitionsParam;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
@@ -107,6 +109,9 @@ public class MilvusIT extends TestSuiteBase implements
TestResource {
private static final String COLLECTION_NAME_2 = "simple_example_2";
private static final String COLLECTION_NAME_WITH_PARTITIONKEY =
"simple_example_with_partitionkey";
+ private static final String COLLECTION_NAME_WITH_PARTITIONS =
"simple_example_with_partitions";
+ private static final String COLLECTION_NAME_SOURCE_WITH_PARTITIONS =
+ "simple_example_source_with_partitions";
private static final String ID_FIELD = "book_id";
private static final String VECTOR_FIELD = "book_intro";
private static final String VECTOR_FIELD2 = "book_kind";
@@ -253,6 +258,93 @@ public class MilvusIT extends TestSuiteBase implements
TestResource {
log.info("Collection created");
+ R<RpcStatus> retWithPartitions =
+ milvusClient.createCollection(
+ CreateCollectionParam.newBuilder()
+
.withCollectionName(COLLECTION_NAME_SOURCE_WITH_PARTITIONS)
+ .withFieldTypes(fieldsSchema)
+ .build());
+ if (retWithPartitions.getStatus() != R.Status.Success.getCode()) {
+ throw new RuntimeException(
+ "Failed to create collection! Error: " +
retWithPartitions.getMessage());
+ }
+ retWithPartitions =
+ milvusClient.createIndex(
+ CreateIndexParam.newBuilder()
+
.withCollectionName(COLLECTION_NAME_SOURCE_WITH_PARTITIONS)
+ .withFieldName(VECTOR_FIELD)
+ .withIndexType(IndexType.FLAT)
+ .withMetricType(MetricType.L2)
+ .build());
+ if (retWithPartitions.getStatus() != R.Status.Success.getCode()) {
+ throw new RuntimeException(
+ "Failed to create index on vector field! Error: "
+ + retWithPartitions.getMessage());
+ }
+ retWithPartitions =
+ milvusClient.createIndex(
+ CreateIndexParam.newBuilder()
+
.withCollectionName(COLLECTION_NAME_SOURCE_WITH_PARTITIONS)
+ .withFieldName(VECTOR_FIELD2)
+ .withIndexType(IndexType.FLAT)
+ .withMetricType(MetricType.L2)
+ .build());
+ if (retWithPartitions.getStatus() != R.Status.Success.getCode()) {
+ throw new RuntimeException(
+ "Failed to create index on vector field! Error: "
+ + retWithPartitions.getMessage());
+ }
+ retWithPartitions =
+ milvusClient.createIndex(
+ CreateIndexParam.newBuilder()
+
.withCollectionName(COLLECTION_NAME_SOURCE_WITH_PARTITIONS)
+ .withFieldName(VECTOR_FIELD3)
+ .withIndexType(IndexType.BIN_FLAT)
+ .withMetricType(MetricType.HAMMING)
+ .build());
+ if (retWithPartitions.getStatus() != R.Status.Success.getCode()) {
+ throw new RuntimeException(
+ "Failed to create index on vector field! Error: "
+ + retWithPartitions.getMessage());
+ }
+ retWithPartitions =
+ milvusClient.createIndex(
+ CreateIndexParam.newBuilder()
+
.withCollectionName(COLLECTION_NAME_SOURCE_WITH_PARTITIONS)
+ .withFieldName(VECTOR_FIELD4)
+ .withIndexType(IndexType.SPARSE_INVERTED_INDEX)
+ .withMetricType(MetricType.IP)
+ .build());
+ if (retWithPartitions.getStatus() != R.Status.Success.getCode()) {
+ throw new RuntimeException(
+ "Failed to create index on vector field! Error: "
+ + retWithPartitions.getMessage());
+ }
+ milvusClient.loadCollection(
+ LoadCollectionParam.newBuilder()
+
.withCollectionName(COLLECTION_NAME_SOURCE_WITH_PARTITIONS)
+ .build());
+ R<RpcStatus> partitionRet =
+ milvusClient.createPartition(
+ CreatePartitionParam.newBuilder()
+
.withCollectionName(COLLECTION_NAME_SOURCE_WITH_PARTITIONS)
+ .withPartitionName("p1")
+ .build());
+ if (partitionRet.getStatus() != R.Status.Success.getCode()) {
+ throw new RuntimeException(
+ "Failed to create partition! Error: " +
partitionRet.getMessage());
+ }
+ partitionRet =
+ milvusClient.createPartition(
+ CreatePartitionParam.newBuilder()
+
.withCollectionName(COLLECTION_NAME_SOURCE_WITH_PARTITIONS)
+ .withPartitionName("p2")
+ .build());
+ if (partitionRet.getStatus() != R.Status.Success.getCode()) {
+ throw new RuntimeException(
+ "Failed to create partition! Error: " +
partitionRet.getMessage());
+ }
+
// Define fields With Partition Key
List<FieldType> fieldsSchemaWithPartitionKey =
Arrays.asList(
@@ -395,9 +487,16 @@ public class MilvusIT extends TestSuiteBase implements
TestResource {
.withCollectionName(COLLECTION_NAME_WITH_PARTITIONKEY)
.withRows(rows)
.build());
+ R<MutationResult> insertRet3 =
+ milvusClient.insert(
+ InsertParam.newBuilder()
+
.withCollectionName(COLLECTION_NAME_SOURCE_WITH_PARTITIONS)
+ .withRows(rows)
+ .build());
if (insertRet.getStatus() != R.Status.Success.getCode()
- || insertRet2.getStatus() != R.Status.Success.getCode()) {
+ || insertRet2.getStatus() != R.Status.Success.getCode()
+ || insertRet3.getStatus() != R.Status.Success.getCode()) {
throw new RuntimeException("Failed to insert! Error: " +
insertRet.getMessage());
}
}
@@ -484,6 +583,33 @@ public class MilvusIT extends TestSuiteBase implements
TestResource {
Assertions.assertTrue(fields.contains(TITLE_FIELD));
}
+ @TestTemplate
+ public void testMilvusWithPartitions(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob("/milvus-to-milvus-with-partitions.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ R<Boolean> hasCollectionResponse =
+ this.milvusClient.hasCollection(
+ HasCollectionParam.newBuilder()
+ .withDatabaseName("test")
+
.withCollectionName(COLLECTION_NAME_WITH_PARTITIONS)
+ .build());
+ Assertions.assertTrue(hasCollectionResponse.getData());
+
+ R<io.milvus.grpc.ShowPartitionsResponse> showPartitionsResponse =
+ this.milvusClient.showPartitions(
+ ShowPartitionsParam.newBuilder()
+ .withDatabaseName("test")
+
.withCollectionName(COLLECTION_NAME_WITH_PARTITIONS)
+ .build());
+ Assertions.assertEquals(R.Status.Success.getCode(),
showPartitionsResponse.getStatus());
+ List<String> partitionNames =
showPartitionsResponse.getData().getPartitionNamesList();
+ Assertions.assertTrue(partitionNames.contains("p1"));
+ Assertions.assertTrue(partitionNames.contains("p2"));
+ }
+
@TestTemplate
public void testFakeToMilvus(TestContainer container) throws IOException,
InterruptedException {
Container.ExecResult execResult =
container.executeJob("/fake-to-milvus.conf");
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/milvus-to-milvus-with-partitions.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/milvus-to-milvus-with-partitions.conf
new file mode 100644
index 0000000000..95cb728104
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/milvus-to-milvus-with-partitions.conf
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Milvus {
+ url = "http://milvus-e2e:19530"
+ token = "root:Milvus"
+ collection = "simple_example_source_with_partitions"
+ }
+}
+
+sink {
+ Milvus {
+ url = "http://milvus-e2e:19530"
+ token = "root:Milvus"
+ database = "test"
+ collection = "simple_example_with_partitions"
+ }
+}