This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 0c69b9166e [Feature][Connector-V2][Milvus] Support Milvus source &
sink (#7158)
0c69b9166e is described below
commit 0c69b9166efb9637f83f0f8f817250f2cce15522
Author: Thomas-HuWei <[email protected]>
AuthorDate: Fri Jul 12 17:50:26 2024 +0800
[Feature][Connector-V2][Milvus] Support Milvus source & sink (#7158)
---
config/plugin_config | 1 +
docs/en/connector-v2/sink/Mivlus.md | 59 +++
docs/en/connector-v2/source/Mivlus.md | 55 +++
plugin-mapping.properties | 2 +
.../apache/seatunnel/api/table/catalog/Column.java | 3 +-
.../seatunnel/api/table/catalog/ConstraintKey.java | 3 +-
.../seatunnel/api/table/catalog/PrimaryKey.java | 19 +
.../seatunnel/api/table/catalog/VectorIndex.java | 110 ++++++
.../seatunnel/api/table/type/SeaTunnelRow.java | 2 +
.../apache/seatunnel/api/table/type/SqlType.java | 5 +
.../seatunnel/api/table/type/VectorType.java | 85 +++++
seatunnel-connectors-v2/connector-milvus/pom.xml | 60 ++++
.../seatunnel/milvus/catalog/MilvusCatalog.java | 380 ++++++++++++++++++++
.../milvus/catalog/MilvusCatalogFactory.java | 45 +++
.../seatunnel/milvus/catalog/MilvusOptions.java | 24 +-
.../seatunnel/milvus/config/MilvusSinkConfig.java | 87 +++++
.../milvus/config/MilvusSourceConfig.java | 48 +++
.../milvus/convert/MilvusConvertUtils.java | 397 +++++++++++++++++++++
.../exception/MilvusConnectionErrorCode.java | 57 +++
.../milvus/exception/MilvusConnectorException.java | 41 +++
.../seatunnel/milvus/sink/MilvusSink.java | 116 ++++++
.../seatunnel/milvus/sink/MilvusSinkCommitter.java | 56 +++
.../seatunnel/milvus/sink/MilvusSinkFactory.java | 80 +++++
.../seatunnel/milvus/sink/MilvusSinkWriter.java | 129 +++++++
.../milvus/sink/batch/MilvusBatchWriter.java | 33 +-
.../milvus/sink/batch/MilvusBufferBatchWriter.java | 143 ++++++++
.../seatunnel/milvus/source/MilvusSource.java | 82 +++++
.../milvus/source/MilvusSourceFactory.java | 61 ++++
.../milvus/source/MilvusSourceReader.java | 261 ++++++++++++++
.../seatunnel/milvus/source/MilvusSourceSplit.java | 39 +-
.../milvus/source/MilvusSourceSplitEnumertor.java | 192 ++++++++++
.../seatunnel/milvus/source/MilvusSourceState.java | 36 +-
.../milvus/state/MilvusAggregatedCommitInfo.java | 32 +-
.../seatunnel/milvus/state/MilvusCommitInfo.java | 31 +-
.../seatunnel/milvus/state/MilvusSinkState.java | 33 +-
seatunnel-connectors-v2/pom.xml | 1 +
seatunnel-dist/pom.xml | 7 +
.../connector-milvus-e2e/pom.xml | 66 ++++
.../e2e/connector/v2/milvus/MilvusIT.java | 218 +++++++++++
.../src/test/resources/milvus-to-milvus.conf | 36 ++
seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | 1 +
41 files changed, 2985 insertions(+), 151 deletions(-)
diff --git a/config/plugin_config b/config/plugin_config
index e642a30021..d80d2e6ab0 100644
--- a/config/plugin_config
+++ b/config/plugin_config
@@ -85,4 +85,5 @@ connector-paimon
connector-rocketmq
connector-tdengine
connector-web3j
+connector-milvus
--end--
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/Mivlus.md
b/docs/en/connector-v2/sink/Mivlus.md
new file mode 100644
index 0000000000..081f427a5d
--- /dev/null
+++ b/docs/en/connector-v2/sink/Mivlus.md
@@ -0,0 +1,59 @@
+# Milvus
+
+> Milvus sink connector
+
+## Description
+
+Write data to Milvus or Zilliz Cloud
+
+## Key Features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
+
+## Data Type Mapping
+
+| Milvus Data Type | SeaTunnel Data Type |
+|---------------------|---------------------|
+| INT8 | TINYINT |
+| INT16 | SMALLINT |
+| INT32 | INT |
+| INT64 | BIGINT |
+| FLOAT | FLOAT |
+| DOUBLE | DOUBLE |
+| BOOL | BOOLEAN |
+| JSON | STRING |
+| ARRAY | ARRAY |
+| VARCHAR | STRING |
+| FLOAT_VECTOR | FLOAT_VECTOR |
+| BINARY_VECTOR | BINARY_VECTOR |
+| FLOAT16_VECTOR | FLOAT16_VECTOR |
+| BFLOAT16_VECTOR | BFLOAT16_VECTOR |
+| SPARSE_FLOAT_VECTOR | SPARSE_FLOAT_VECTOR |
+
+## Sink Options
+
+| Name | Type | Required | Default |
Description |
+|----------------------|---------|----------|------------------------------|-----------------------------------------------------------|
+| url | String | Yes | - |
The URL to connect to Milvus or Zilliz Cloud. |
+| token | String | Yes | - |
User:password |
+| database | String | No | - |
Write data to which database, default is source database. |
+| schema_save_mode | enum | No | CREATE_SCHEMA_WHEN_NOT_EXIST |
Auto create table when table not exist. |
+| enable_auto_id | boolean | No | false |
Primary key column enable autoId. |
+| enable_upsert | boolean | No | false |
Upsert data not insert. |
+| enable_dynamic_field | boolean | No | true |
Enable create table with dynamic field. |
+| batch_size | int | No | 1000 |
Write batch size. |
+
+## Task Example
+
+```bash
+sink {
+ Milvus {
+ url = "http://127.0.0.1:19530"
+ token = "username:password"
+ batch_size = 1000
+ }
+}
+```
+
diff --git a/docs/en/connector-v2/source/Mivlus.md
b/docs/en/connector-v2/source/Mivlus.md
new file mode 100644
index 0000000000..a56df4c5fe
--- /dev/null
+++ b/docs/en/connector-v2/source/Mivlus.md
@@ -0,0 +1,55 @@
+# Milvus
+
+> Milvus source connector
+
+## Description
+
+Read data from Milvus or Zilliz Cloud
+
+## Key Features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
+
+## Data Type Mapping
+
+| Milvus Data Type | SeaTunnel Data Type |
+|---------------------|---------------------|
+| INT8 | TINYINT |
+| INT16 | SMALLINT |
+| INT32 | INT |
+| INT64 | BIGINT |
+| FLOAT | FLOAT |
+| DOUBLE | DOUBLE |
+| BOOL | BOOLEAN |
+| JSON | STRING |
+| ARRAY | ARRAY |
+| VARCHAR | STRING |
+| FLOAT_VECTOR | FLOAT_VECTOR |
+| BINARY_VECTOR | BINARY_VECTOR |
+| FLOAT16_VECTOR | FLOAT16_VECTOR |
+| BFLOAT16_VECTOR | BFLOAT16_VECTOR |
+| SPARSE_FLOAT_VECTOR | SPARSE_FLOAT_VECTOR |
+
+## Source Options
+
+| Name | Type | Required | Default |
Description |
+|------------|--------|----------|---------|--------------------------------------------------------------------------------------------|
+| url | String | Yes | - | The URL to connect to Milvus or
Zilliz Cloud. |
+| token | String | Yes | - | User:password
|
+| database | String | Yes | default | Read data from which database.
|
+| collection | String | No | - | If set, will only read one
collection, otherwise will read all collections under database. |
+
+## Task Example
+
+```bash
+source {
+ Milvus {
+ url = "http://127.0.0.1:19530"
+ token = "username:password"
+ database = "default"
+ }
+}
+```
+
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 6304236ec3..9936afcbaa 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -127,3 +127,5 @@ seatunnel.source.Oracle-CDC = connector-cdc-oracle
seatunnel.sink.Pulsar = connector-pulsar
seatunnel.source.ObsFile = connector-file-obs
seatunnel.sink.ObsFile = connector-file-obs
+seatunnel.source.Milvus = connector-milvus
+seatunnel.sink.Milvus = connector-milvus
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
index d7e236d309..9c3ed338c9 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
@@ -60,7 +60,8 @@ public abstract class Column implements Serializable {
* Number of digits to right of the decimal point.
*
* <p>For decimal data, this is the maximum scale. For time/timestamp
data, this is the maximum
- * allowed precision of the fractional seconds component.
+ * allowed precision of the fractional seconds component. For vector data,
this is the vector
+ * dimension.
*
* <p>Null is returned for data types where the scale is not applicable.
*/
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/ConstraintKey.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/ConstraintKey.java
index 2d39641a42..f2d62852a0 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/ConstraintKey.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/ConstraintKey.java
@@ -72,7 +72,8 @@ public class ConstraintKey implements Serializable {
public enum ConstraintType {
INDEX_KEY,
UNIQUE_KEY,
- FOREIGN_KEY
+ FOREIGN_KEY,
+ VECTOR_INDEX_KEY
}
public enum ColumnSortType {
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PrimaryKey.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PrimaryKey.java
index e8a3a74025..ad88539c2f 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PrimaryKey.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PrimaryKey.java
@@ -34,6 +34,25 @@ public class PrimaryKey implements Serializable {
private final List<String> columnNames;
+ private Boolean enableAutoId;
+
+ public PrimaryKey(String primaryKey, List<String> columnNames) {
+ this.primaryKey = primaryKey;
+ this.columnNames = columnNames;
+ this.enableAutoId = null;
+ }
+
+ public static boolean isPrimaryKeyField(PrimaryKey primaryKey, String
fieldName) {
+ if (primaryKey == null || primaryKey.getColumnNames() == null) {
+ return false;
+ }
+ return primaryKey.getColumnNames().contains(fieldName);
+ }
+
+ public static PrimaryKey of(String primaryKey, List<String> columnNames,
Boolean autoId) {
+ return new PrimaryKey(primaryKey, columnNames, autoId);
+ }
+
public static PrimaryKey of(String primaryKey, List<String> columnNames) {
return new PrimaryKey(primaryKey, columnNames);
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/VectorIndex.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/VectorIndex.java
new file mode 100644
index 0000000000..5d6dd1beaa
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/VectorIndex.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
+import java.io.Serializable;
+
+/** Vector Database need special Index on its vector field. */
+@EqualsAndHashCode(callSuper = true)
+@Getter
+public class VectorIndex extends ConstraintKey.ConstraintKeyColumn implements
Serializable {
+
+ /** Vector index name */
+ private final String indexName;
+
+ /** Vector indexType, such as IVF_FLAT, HNSW, DISKANN */
+ private final IndexType indexType;
+
+ /** Vector index metricType, such as L2, IP, COSINE */
+ private final MetricType metricType;
+
+ public VectorIndex(String indexName, String columnName, String indexType,
String metricType) {
+ super(columnName, null);
+ this.indexName = indexName;
+ this.indexType = IndexType.of(indexType);
+ this.metricType = MetricType.of(metricType);
+ }
+
+ public VectorIndex(
+ String indexName, String columnName, IndexType indexType,
MetricType metricType) {
+ super(columnName, null);
+ this.indexName = indexName;
+ this.indexType = indexType;
+ this.metricType = metricType;
+ }
+
+ @Override
+ public ConstraintKey.ConstraintKeyColumn copy() {
+ return new VectorIndex(indexName, getColumnName(), indexType,
metricType);
+ }
+
+ public enum IndexType {
+ FLAT,
+ IVF_FLAT,
+ IVF_SQ8,
+ IVF_PQ,
+ HNSW,
+ DISKANN,
+ AUTOINDEX,
+ SCANN,
+
+ // GPU indexes only for float vectors
+ GPU_IVF_FLAT,
+ GPU_IVF_PQ,
+ GPU_BRUTE_FORCE,
+ GPU_CAGRA,
+
+ // Only supported for binary vectors
+ BIN_FLAT,
+ BIN_IVF_FLAT,
+
+ // Only for varchar type field
+ TRIE,
+ // Only for scalar type field
+ STL_SORT, // only for numeric type field
+ INVERTED, // works for all scalar fields except JSON type field
+
+ // Only for sparse vectors
+ SPARSE_INVERTED_INDEX,
+ SPARSE_WAND,
+ ;
+
+ public static IndexType of(String name) {
+ return valueOf(name.toUpperCase());
+ }
+ }
+
+ public enum MetricType {
+ // Only for float vectors
+ L2,
+ IP,
+ COSINE,
+
+ // Only for binary vectors
+ HAMMING,
+ JACCARD,
+ ;
+
+ public static MetricType of(String name) {
+ return valueOf(name.toUpperCase());
+ }
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
index 1e507cb1fa..95a36b796c 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
@@ -141,6 +141,8 @@ public final class SeaTunnelRow implements Serializable {
return 12;
case TIMESTAMP:
return 48;
+ case FLOAT_VECTOR:
+ return getArrayNotNullSize((Object[]) v) * 4;
case ARRAY:
SeaTunnelDataType elementType = ((ArrayType)
dataType).getElementType();
if (elementType instanceof DecimalType) {
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
index 838a384809..e33ceb8d3c 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
@@ -35,6 +35,11 @@ public enum SqlType {
DATE,
TIME,
TIMESTAMP,
+ BINARY_VECTOR,
+ FLOAT_VECTOR,
+ FLOAT16_VECTOR,
+ BFLOAT16_VECTOR,
+ SPARSE_FLOAT_VECTOR,
ROW,
MULTIPLE_ROW;
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/VectorType.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/VectorType.java
new file mode 100644
index 0000000000..39d2849f1a
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/VectorType.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.type;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+
+public class VectorType<T> implements SeaTunnelDataType<T> {
+ private static final long serialVersionUID = 2L;
+
+ public static final VectorType<Float> VECTOR_FLOAT_TYPE =
+ new VectorType<>(Float.class, SqlType.FLOAT_VECTOR);
+
+ public static final VectorType<Map> VECTOR_SPARSE_FLOAT_TYPE =
+ new VectorType<>(Map.class, SqlType.SPARSE_FLOAT_VECTOR);
+
+ public static final VectorType<Byte> VECTOR_BINARY_TYPE =
+ new VectorType<>(Byte.class, SqlType.BINARY_VECTOR);
+
+ public static final VectorType<ByteBuffer> VECTOR_FLOAT16_TYPE =
+ new VectorType<>(ByteBuffer.class, SqlType.FLOAT16_VECTOR);
+
+ public static final VectorType<ByteBuffer> VECTOR_BFLOAT16_TYPE =
+ new VectorType<>(ByteBuffer.class, SqlType.BFLOAT16_VECTOR);
+
+ //
--------------------------------------------------------------------------------------------
+
+ /** The physical type class. */
+ private final Class<T> typeClass;
+
+ private final SqlType sqlType;
+
+ protected VectorType(Class<T> typeClass, SqlType sqlType) {
+ this.typeClass = typeClass;
+ this.sqlType = sqlType;
+ }
+
+ @Override
+ public Class<T> getTypeClass() {
+ return this.typeClass;
+ }
+
+ @Override
+ public SqlType getSqlType() {
+ return this.sqlType;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof VectorType)) {
+ return false;
+ }
+ VectorType<?> that = (VectorType<?>) obj;
+ return Objects.equals(typeClass, that.typeClass) &&
Objects.equals(sqlType, that.sqlType);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(typeClass, sqlType);
+ }
+
+ @Override
+ public String toString() {
+ return sqlType.toString();
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-milvus/pom.xml
b/seatunnel-connectors-v2/connector-milvus/pom.xml
new file mode 100644
index 0000000000..50d69d4f5b
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-milvus/pom.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connectors-v2</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>connector-milvus</artifactId>
+ <name>SeaTunnel : Connectors V2 : Milvus</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>io.milvus</groupId>
+ <artifactId>milvus-sdk-java</artifactId>
+ <version>2.4.1</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-reload4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>4.11.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-inline</artifactId>
+ <version>4.11.0</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+</project>
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalog.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalog.java
new file mode 100644
index 0000000000..dcca41320c
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalog.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.InfoPreviewResult;
+import org.apache.seatunnel.api.table.catalog.PreviewResult;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.VectorIndex;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.milvus.convert.MilvusConvertUtils;
+import
org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import io.milvus.client.MilvusServiceClient;
+import io.milvus.common.clientenum.ConsistencyLevelEnum;
+import io.milvus.grpc.DataType;
+import io.milvus.grpc.ListDatabasesResponse;
+import io.milvus.grpc.ShowCollectionsResponse;
+import io.milvus.grpc.ShowType;
+import io.milvus.param.ConnectParam;
+import io.milvus.param.IndexType;
+import io.milvus.param.MetricType;
+import io.milvus.param.R;
+import io.milvus.param.RpcStatus;
+import io.milvus.param.collection.CreateCollectionParam;
+import io.milvus.param.collection.CreateDatabaseParam;
+import io.milvus.param.collection.DropCollectionParam;
+import io.milvus.param.collection.DropDatabaseParam;
+import io.milvus.param.collection.FieldType;
+import io.milvus.param.collection.HasCollectionParam;
+import io.milvus.param.collection.ShowCollectionsParam;
+import io.milvus.param.index.CreateIndexParam;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+@Slf4j
+public class MilvusCatalog implements Catalog {
+
+ private final String catalogName;
+ private final ReadonlyConfig config;
+
+ private MilvusServiceClient client;
+
+ public MilvusCatalog(String catalogName, ReadonlyConfig config) {
+ this.catalogName = catalogName;
+ this.config = config;
+ }
+
+ @Override
+ public void open() throws CatalogException {
+ ConnectParam connectParam =
+ ConnectParam.newBuilder()
+ .withUri(config.get(MilvusSinkConfig.URL))
+ .withToken(config.get(MilvusSinkConfig.TOKEN))
+ .build();
+ try {
+ this.client = new MilvusServiceClient(connectParam);
+ } catch (Exception e) {
+ throw new CatalogException(String.format("Failed to open catalog
%s", catalogName), e);
+ }
+ }
+
+ @Override
+ public void close() throws CatalogException {
+ this.client.close();
+ }
+
+ @Override
+ public String name() {
+ return catalogName;
+ }
+
+ @Override
+ public PreviewResult previewAction(
+ ActionType actionType, TablePath tablePath, Optional<CatalogTable>
catalogTable) {
+ if (actionType == ActionType.CREATE_TABLE) {
+ return new InfoPreviewResult("create collection " +
tablePath.getTableName());
+ } else if (actionType == ActionType.DROP_TABLE) {
+ return new InfoPreviewResult("drop collection " +
tablePath.getTableName());
+ } else if (actionType == ActionType.CREATE_DATABASE) {
+ return new InfoPreviewResult("create database " +
tablePath.getDatabaseName());
+ } else if (actionType == ActionType.DROP_DATABASE) {
+ return new InfoPreviewResult("drop database " +
tablePath.getDatabaseName());
+ } else {
+ throw new UnsupportedOperationException("Unsupported action type:
" + actionType);
+ }
+ }
+
+ @Override
+ public String getDefaultDatabase() throws CatalogException {
+ return "default";
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException
{
+ List<String> databases = this.listDatabases();
+ return databases.contains(databaseName);
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ R<ListDatabasesResponse> response = this.client.listDatabases();
+ return response.getData().getDbNamesList();
+ }
+
+ @Override
+ public List<String> listTables(String databaseName)
+ throws CatalogException, DatabaseNotExistException {
+ R<ShowCollectionsResponse> response =
+ this.client.showCollections(
+ ShowCollectionsParam.newBuilder()
+ .withDatabaseName(databaseName)
+ .withShowType(ShowType.All)
+ .build());
+
+ return response.getData().getCollectionNamesList();
+ }
+
+ @Override
+ public boolean tableExists(TablePath tablePath) throws CatalogException {
+ R<Boolean> response =
+ this.client.hasCollection(
+ HasCollectionParam.newBuilder()
+ .withDatabaseName(tablePath.getDatabaseName())
+ .withCollectionName(tablePath.getTableName())
+ .build());
+ if (response.getData() != null) {
+ return response.getData();
+ }
+ throw new MilvusConnectorException(
+ MilvusConnectionErrorCode.SERVER_RESPONSE_FAILED,
+ response.getMessage(),
+ response.getException());
+ }
+
+ @Override
+ public CatalogTable getTable(TablePath tablePath)
+ throws CatalogException, TableNotExistException {
+ throw new RuntimeException("not implemented");
+ }
+
+ @Override
+ public void createTable(TablePath tablePath, CatalogTable catalogTable,
boolean ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {
+ checkNotNull(tablePath, "Table path cannot be null");
+ if (!databaseExists(tablePath.getDatabaseName())) {
+ throw new DatabaseNotExistException(catalogName,
tablePath.getDatabaseName());
+ }
+ if (tableExists(tablePath)) {
+ if (ignoreIfExists) {
+ return;
+ }
+ throw new TableAlreadyExistException(catalogName, tablePath);
+ }
+
+ checkNotNull(catalogTable, "catalogTable must not be null");
+ TableSchema tableSchema = catalogTable.getTableSchema();
+ checkNotNull(tableSchema, "tableSchema must not be null");
+ createTableInternal(tablePath, catalogTable);
+
+ if (CollectionUtils.isNotEmpty(tableSchema.getConstraintKeys())) {
+ for (ConstraintKey constraintKey :
tableSchema.getConstraintKeys()) {
+ if (constraintKey
+ .getConstraintType()
+
.equals(ConstraintKey.ConstraintType.VECTOR_INDEX_KEY)) {
+ createIndexInternal(tablePath,
constraintKey.getColumnNames());
+ }
+ }
+ }
+ }
+
+ private void createIndexInternal(
+ TablePath tablePath, List<ConstraintKey.ConstraintKeyColumn>
vectorIndexes) {
+ for (ConstraintKey.ConstraintKeyColumn column : vectorIndexes) {
+ VectorIndex index = (VectorIndex) column;
+ CreateIndexParam createIndexParam =
+ CreateIndexParam.newBuilder()
+ .withDatabaseName(tablePath.getDatabaseName())
+ .withCollectionName(tablePath.getTableName())
+ .withFieldName(index.getColumnName())
+ .withIndexName(index.getIndexName())
+
.withIndexType(IndexType.valueOf(index.getIndexType().name()))
+
.withMetricType(MetricType.valueOf(index.getMetricType().name()))
+ .build();
+
+ R<RpcStatus> response = client.createIndex(createIndexParam);
+ if (!Objects.equals(response.getStatus(),
R.success().getStatus())) {
+ throw new MilvusConnectorException(
+ MilvusConnectionErrorCode.CREATE_INDEX_ERROR,
response.getMessage());
+ }
+ }
+ }
+
+ public void createTableInternal(TablePath tablePath, CatalogTable
catalogTable) {
+ try {
+ TableSchema tableSchema = catalogTable.getTableSchema();
+ List<FieldType> fieldTypes = new ArrayList<>();
+ for (Column column : tableSchema.getColumns()) {
+ fieldTypes.add(convertToFieldType(column,
tableSchema.getPrimaryKey()));
+ }
+
+ Map<String, String> options = catalogTable.getOptions();
+ Boolean enableDynamicField =
+ (options.containsKey(MilvusOptions.ENABLE_DYNAMIC_FIELD))
+ ?
Boolean.valueOf(options.get(MilvusOptions.ENABLE_DYNAMIC_FIELD))
+ :
config.get(MilvusSinkConfig.ENABLE_DYNAMIC_FIELD);
+
+ CreateCollectionParam.Builder builder =
+ CreateCollectionParam.newBuilder()
+ .withDatabaseName(tablePath.getDatabaseName())
+ .withCollectionName(tablePath.getTableName())
+ .withFieldTypes(fieldTypes)
+ .withEnableDynamicField(enableDynamicField)
+
.withConsistencyLevel(ConsistencyLevelEnum.BOUNDED);
+ if (null != catalogTable.getComment()) {
+ builder.withDescription(catalogTable.getComment());
+ }
+
+ CreateCollectionParam createCollectionParam = builder.build();
+ R<RpcStatus> response =
this.client.createCollection(createCollectionParam);
+ if (!Objects.equals(response.getStatus(),
R.success().getStatus())) {
+ throw new MilvusConnectorException(
+ MilvusConnectionErrorCode.CREATE_COLLECTION_ERROR,
response.getMessage());
+ }
+ } catch (Exception e) {
+ throw new MilvusConnectorException(
+ MilvusConnectionErrorCode.CREATE_COLLECTION_ERROR, e);
+ }
+ }
+
+ private FieldType convertToFieldType(Column column, PrimaryKey primaryKey)
{
+ SeaTunnelDataType<?> seaTunnelDataType = column.getDataType();
+ FieldType.Builder build =
+ FieldType.newBuilder()
+ .withName(column.getName())
+ .withDataType(
+ MilvusConvertUtils.convertSqlTypeToDataType(
+ seaTunnelDataType.getSqlType()));
+ switch (seaTunnelDataType.getSqlType()) {
+ case ROW:
+ build.withMaxLength(65535);
+ break;
+ case DATE:
+ build.withMaxLength(20);
+ break;
+ case INT:
+ build.withDataType(DataType.Int32);
+ break;
+ case SMALLINT:
+ build.withDataType(DataType.Int16);
+ break;
+ case TINYINT:
+ build.withDataType(DataType.Int8);
+ break;
+ case FLOAT:
+ build.withDataType(DataType.Float);
+ break;
+ case DOUBLE:
+ build.withDataType(DataType.Double);
+ break;
+ case MAP:
+ build.withDataType(DataType.JSON);
+ break;
+ case BOOLEAN:
+ build.withDataType(DataType.Bool);
+ break;
+ case STRING:
+ if (column.getColumnLength() == 0) {
+ build.withMaxLength(512);
+ } else {
+ build.withMaxLength((int) (column.getColumnLength() / 4));
+ }
+ break;
+ case ARRAY:
+ ArrayType arrayType = (ArrayType) column.getDataType();
+ SeaTunnelDataType elementType = arrayType.getElementType();
+ build.withElementType(
+
MilvusConvertUtils.convertSqlTypeToDataType(elementType.getSqlType()));
+ build.withMaxCapacity(4095);
+ switch (elementType.getSqlType()) {
+ case STRING:
+ if (column.getColumnLength() == 0) {
+ build.withMaxLength(512);
+ } else {
+ build.withMaxLength((int)
(column.getColumnLength() / 4));
+ }
+ break;
+ }
+ break;
+ case BINARY_VECTOR:
+ case FLOAT_VECTOR:
+ case FLOAT16_VECTOR:
+ case BFLOAT16_VECTOR:
+ build.withDimension(column.getScale());
+ break;
+ }
+
+ if (null != primaryKey &&
primaryKey.getColumnNames().contains(column.getName())) {
+ build.withPrimaryKey(true);
+ if (null != primaryKey.getEnableAutoId()) {
+ build.withAutoID(primaryKey.getEnableAutoId());
+ } else {
+ build.withAutoID(config.get(MilvusSinkConfig.ENABLE_AUTO_ID));
+ }
+ }
+
+ return build.build();
+ }
+
+ @Override
+ public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ this.client.dropCollection(
+ DropCollectionParam.newBuilder()
+ .withDatabaseName(tablePath.getDatabaseName())
+ .withCollectionName(tablePath.getTableName())
+ .build());
+ }
+
+ @Override
+ public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ R<RpcStatus> response =
+ this.client.createDatabase(
+ CreateDatabaseParam.newBuilder()
+ .withDatabaseName(tablePath.getDatabaseName())
+ .build());
+ if (!R.success().getStatus().equals(response.getStatus())) {
+ throw new MilvusConnectorException(
+ MilvusConnectionErrorCode.CREATE_DATABASE_ERROR,
response.getMessage());
+ }
+ }
+
+ @Override
+ public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
+ throws DatabaseNotExistException, CatalogException {
+ this.client.dropDatabase(
+ DropDatabaseParam.newBuilder()
+ .withDatabaseName(tablePath.getDatabaseName())
+ .build());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalogFactory.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalogFactory.java
new file mode 100644
index 0000000000..292c0464f2
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalogFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class MilvusCatalogFactory implements CatalogFactory {
+
+ @Override
+ public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+ return new MilvusCatalog(catalogName, options);
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return "Milvus";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder().build();
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusOptions.java
similarity index 70%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
copy to
seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusOptions.java
index 838a384809..b589b21d3d 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusOptions.java
@@ -14,27 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.seatunnel.connectors.seatunnel.milvus.catalog;
-package org.apache.seatunnel.api.table.type;
+public class MilvusOptions {
-/** The sql type of {@link SeaTunnelDataType}. */
-public enum SqlType {
- ARRAY,
- MAP,
- STRING,
- BOOLEAN,
- TINYINT,
- SMALLINT,
- INT,
- BIGINT,
- FLOAT,
- DOUBLE,
- DECIMAL,
- NULL,
- BYTES,
- DATE,
- TIME,
- TIMESTAMP,
- ROW,
- MULTIPLE_ROW;
+ public static final String ENABLE_DYNAMIC_FIELD = "enableDynamicField";
}
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkConfig.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkConfig.java
new file mode 100644
index 0000000000..d2357e559c
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkConfig.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
+
+import java.util.Arrays;
+
+import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
+import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
+import static
org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
+
+public class MilvusSinkConfig {
+
+ public static final String CONNECTOR_IDENTITY = "Milvus";
+
+ public static final Option<String> URL =
+ Options.key("url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Milvus public endpoint");
+
+ public static final Option<String> TOKEN =
+ Options.key("token")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Milvus token for authentication");
+
+ public static final Option<String> DATABASE =
+
Options.key("database").stringType().noDefaultValue().withDescription("database");
+
+ public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
+ Options.key("schema_save_mode")
+ .enumType(SchemaSaveMode.class)
+ .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
+ .withDescription("schema_save_mode");
+
+ public static final Option<DataSaveMode> DATA_SAVE_MODE =
+ Options.key("data_save_mode")
+ .singleChoice(
+ DataSaveMode.class,
+ Arrays.asList(DROP_DATA, APPEND_DATA,
ERROR_WHEN_DATA_EXISTS))
+ .defaultValue(APPEND_DATA)
+ .withDescription("data_save_mode");
+
+ public static final Option<Boolean> ENABLE_AUTO_ID =
+ Options.key("enable_auto_id")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Enable Auto Id");
+
+ public static final Option<Boolean> ENABLE_UPSERT =
+ Options.key("enable_upsert")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Enable upsert mode");
+
+ public static final Option<Boolean> ENABLE_DYNAMIC_FIELD =
+ Options.key("enable_dynamic_field")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Enable dynamic field");
+
+ public static final Option<Integer> BATCH_SIZE =
+ Options.key("batch_size")
+ .intType()
+ .defaultValue(1000)
+ .withDescription("writer batch size");
+}
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSourceConfig.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSourceConfig.java
new file mode 100644
index 0000000000..aa92286ac0
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSourceConfig.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class MilvusSourceConfig {
+
+ public static final Option<String> URL =
+ Options.key("url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Milvus public endpoint");
+
+ public static final Option<String> TOKEN =
+ Options.key("token")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Milvus token for authentication");
+
+ public static final Option<String> DATABASE =
+ Options.key("database")
+ .stringType()
+ .defaultValue("default")
+ .withDescription("database");
+
+ public static final Option<String> COLLECTION =
+ Options.key("collection")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Milvus collection to read");
+}
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/convert/MilvusConvertUtils.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/convert/MilvusConvertUtils.java
new file mode 100644
index 0000000000..6b2661680b
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/convert/MilvusConvertUtils.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.convert;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.VectorIndex;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.api.table.type.VectorType;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.connectors.seatunnel.milvus.catalog.MilvusOptions;
+import
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.util.Lists;
+
+import com.google.protobuf.ProtocolStringList;
+import io.milvus.client.MilvusServiceClient;
+import io.milvus.common.utils.JacksonUtils;
+import io.milvus.grpc.CollectionSchema;
+import io.milvus.grpc.DataType;
+import io.milvus.grpc.DescribeCollectionResponse;
+import io.milvus.grpc.DescribeIndexResponse;
+import io.milvus.grpc.FieldSchema;
+import io.milvus.grpc.IndexDescription;
+import io.milvus.grpc.KeyValuePair;
+import io.milvus.grpc.ShowCollectionsResponse;
+import io.milvus.grpc.ShowType;
+import io.milvus.param.ConnectParam;
+import io.milvus.param.R;
+import io.milvus.param.collection.DescribeCollectionParam;
+import io.milvus.param.collection.ShowCollectionsParam;
+import io.milvus.param.index.DescribeIndexParam;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class MilvusConvertUtils {
+
+ private static final String CATALOG_NAME = "Milvus";
+
+ public static Map<TablePath, CatalogTable> getSourceTables(ReadonlyConfig
config) {
+ MilvusServiceClient client =
+ new MilvusServiceClient(
+ ConnectParam.newBuilder()
+ .withUri(config.get(MilvusSourceConfig.URL))
+
.withToken(config.get(MilvusSourceConfig.TOKEN))
+ .build());
+
+ String database = config.get(MilvusSourceConfig.DATABASE);
+ List<String> collectionList = new ArrayList<>();
+ if (StringUtils.isNotEmpty(config.get(MilvusSourceConfig.COLLECTION)))
{
+ collectionList.add(config.get(MilvusSourceConfig.COLLECTION));
+ } else {
+ R<ShowCollectionsResponse> response =
+ client.showCollections(
+ ShowCollectionsParam.newBuilder()
+ .withDatabaseName(database)
+ .withShowType(ShowType.All)
+ .build());
+ if (response.getStatus() != R.Status.Success.getCode()) {
+ throw new MilvusConnectorException(
+ MilvusConnectionErrorCode.SHOW_COLLECTIONS_ERROR);
+ }
+
+ ProtocolStringList collections =
response.getData().getCollectionNamesList();
+ if (CollectionUtils.isEmpty(collections)) {
+ throw new MilvusConnectorException(
+ MilvusConnectionErrorCode.DATABASE_NO_COLLECTIONS,
database);
+ }
+ collectionList.addAll(collections);
+ }
+
+ Map<TablePath, CatalogTable> map = new HashMap<>();
+ for (String collection : collectionList) {
+ CatalogTable catalogTable = getCatalogTable(client, database,
collection);
+ map.put(TablePath.of(database, collection), catalogTable);
+ }
+ return map;
+ }
+
+ public static CatalogTable getCatalogTable(
+ MilvusServiceClient client, String database, String collection) {
+ R<DescribeCollectionResponse> response =
+ client.describeCollection(
+ DescribeCollectionParam.newBuilder()
+ .withDatabaseName(database)
+ .withCollectionName(collection)
+ .build());
+
+ if (response.getStatus() != R.Status.Success.getCode()) {
+ throw new
MilvusConnectorException(MilvusConnectionErrorCode.DESC_COLLECTION_ERROR);
+ }
+
+ // collection column
+ DescribeCollectionResponse data = response.getData();
+ CollectionSchema schema = data.getSchema();
+ List<Column> columns = new ArrayList<>();
+ for (FieldSchema fieldSchema : schema.getFieldsList()) {
+ columns.add(MilvusConvertUtils.convertColumn(fieldSchema));
+ }
+
+ // primary key
+ PrimaryKey primaryKey = buildPrimaryKey(schema.getFieldsList());
+
+ // index
+ R<DescribeIndexResponse> describeIndexResponseR =
+ client.describeIndex(
+ DescribeIndexParam.newBuilder()
+ .withDatabaseName(database)
+ .withCollectionName(collection)
+ .build());
+ if (describeIndexResponseR.getStatus() != R.Status.Success.getCode()) {
+ throw new
MilvusConnectorException(MilvusConnectionErrorCode.DESC_INDEX_ERROR);
+ }
+ DescribeIndexResponse indexResponse = describeIndexResponseR.getData();
+ List<ConstraintKey.ConstraintKeyColumn> vectorIndexes =
buildVectorIndexes(indexResponse);
+
+ // build tableSchema
+ TableSchema tableSchema =
+ TableSchema.builder()
+ .columns(columns)
+ .primaryKey(primaryKey)
+ .constraintKey(
+ ConstraintKey.of(
+
ConstraintKey.ConstraintType.VECTOR_INDEX_KEY,
+ "vector_index",
+ vectorIndexes))
+ .build();
+
+ // build tableId
+ TableIdentifier tableId = TableIdentifier.of(CATALOG_NAME, database,
collection);
+
+ // build options info
+ Map<String, String> options = new HashMap<>();
+ options.put(
+ MilvusOptions.ENABLE_DYNAMIC_FIELD,
String.valueOf(schema.getEnableDynamicField()));
+
+ return CatalogTable.of(
+ tableId, tableSchema, options, new ArrayList<>(),
schema.getDescription());
+ }
+
+ private static List<ConstraintKey.ConstraintKeyColumn> buildVectorIndexes(
+ DescribeIndexResponse indexResponse) {
+ if (CollectionUtils.isEmpty(indexResponse.getIndexDescriptionsList()))
{
+ return null;
+ }
+
+ List<ConstraintKey.ConstraintKeyColumn> list = new ArrayList<>();
+ for (IndexDescription per : indexResponse.getIndexDescriptionsList()) {
+ Map<String, String> paramsMap =
+ per.getParamsList().stream()
+ .collect(
+ Collectors.toMap(KeyValuePair::getKey,
KeyValuePair::getValue));
+
+ VectorIndex index =
+ new VectorIndex(
+ per.getIndexName(),
+ per.getFieldName(),
+ paramsMap.get("index_type"),
+ paramsMap.get("metric_type"));
+
+ list.add(index);
+ }
+
+ return list;
+ }
+
+ public static PrimaryKey buildPrimaryKey(List<FieldSchema> fields) {
+ for (FieldSchema field : fields) {
+ if (field.getIsPrimaryKey()) {
+ return PrimaryKey.of(
+ field.getName(), Lists.newArrayList(field.getName()),
field.getAutoID());
+ }
+ }
+
+ return null;
+ }
+
+ public static PhysicalColumn convertColumn(FieldSchema fieldSchema) {
+ DataType dataType = fieldSchema.getDataType();
+ PhysicalColumn.PhysicalColumnBuilder builder =
PhysicalColumn.builder();
+ builder.name(fieldSchema.getName());
+ builder.sourceType(dataType.name());
+ builder.comment(fieldSchema.getDescription());
+
+ switch (dataType) {
+ case Bool:
+ builder.dataType(BasicType.BOOLEAN_TYPE);
+ break;
+ case Int8:
+ builder.dataType(BasicType.BYTE_TYPE);
+ break;
+ case Int16:
+ builder.dataType(BasicType.SHORT_TYPE);
+ break;
+ case Int32:
+ builder.dataType(BasicType.INT_TYPE);
+ break;
+ case Int64:
+ builder.dataType(BasicType.LONG_TYPE);
+ break;
+ case Float:
+ builder.dataType(BasicType.FLOAT_TYPE);
+ break;
+ case Double:
+ builder.dataType(BasicType.DOUBLE_TYPE);
+ break;
+ case VarChar:
+ builder.dataType(BasicType.STRING_TYPE);
+ for (KeyValuePair keyValuePair :
fieldSchema.getTypeParamsList()) {
+ if (keyValuePair.getKey().equals("max_length")) {
+
builder.columnLength(Long.parseLong(keyValuePair.getValue()) * 4);
+ break;
+ }
+ }
+ break;
+ case String:
+ case JSON:
+ builder.dataType(BasicType.STRING_TYPE);
+ break;
+ case Array:
+ builder.dataType(ArrayType.STRING_ARRAY_TYPE);
+ break;
+ case FloatVector:
+ builder.dataType(VectorType.VECTOR_FLOAT_TYPE);
+ for (KeyValuePair keyValuePair :
fieldSchema.getTypeParamsList()) {
+ if (keyValuePair.getKey().equals("dim")) {
+
builder.scale(Integer.valueOf(keyValuePair.getValue()));
+ break;
+ }
+ }
+ break;
+ case BinaryVector:
+ builder.dataType(VectorType.VECTOR_BINARY_TYPE);
+ for (KeyValuePair keyValuePair :
fieldSchema.getTypeParamsList()) {
+ if (keyValuePair.getKey().equals("dim")) {
+
builder.scale(Integer.valueOf(keyValuePair.getValue()));
+ break;
+ }
+ }
+ break;
+ case SparseFloatVector:
+ builder.dataType(VectorType.VECTOR_SPARSE_FLOAT_TYPE);
+ break;
+ case Float16Vector:
+ builder.dataType(VectorType.VECTOR_FLOAT16_TYPE);
+ for (KeyValuePair keyValuePair :
fieldSchema.getTypeParamsList()) {
+ if (keyValuePair.getKey().equals("dim")) {
+
builder.scale(Integer.valueOf(keyValuePair.getValue()));
+ break;
+ }
+ }
+ break;
+ case BFloat16Vector:
+ builder.dataType(VectorType.VECTOR_BFLOAT16_TYPE);
+ for (KeyValuePair keyValuePair :
fieldSchema.getTypeParamsList()) {
+ if (keyValuePair.getKey().equals("dim")) {
+
builder.scale(Integer.valueOf(keyValuePair.getValue()));
+ break;
+ }
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported data
type: " + dataType);
+ }
+
+ return builder.build();
+ }
+
+ public static Object convertBySeaTunnelType(SeaTunnelDataType<?>
fieldType, Object value) {
+ SqlType sqlType = fieldType.getSqlType();
+ switch (sqlType) {
+ case INT:
+ return Integer.parseInt(value.toString());
+ case BIGINT:
+ return Long.parseLong(value.toString());
+ case SMALLINT:
+ return Short.parseShort(value.toString());
+ case STRING:
+ case DATE:
+ return value.toString();
+ case FLOAT_VECTOR:
+ List<Float> vector = new ArrayList<>();
+ for (Object o : (Object[]) value) {
+ vector.add(Float.parseFloat(o.toString()));
+ }
+ return vector;
+ case FLOAT:
+ return Float.parseFloat(value.toString());
+ case BOOLEAN:
+ return Boolean.parseBoolean(value.toString());
+ case DOUBLE:
+ return Double.parseDouble(value.toString());
+ case ARRAY:
+ ArrayType<?, ?> arrayType = (ArrayType<?, ?>) fieldType;
+ switch (arrayType.getElementType().getSqlType()) {
+ case STRING:
+ String[] stringArray = (String[]) value;
+ return Arrays.asList(stringArray);
+ case INT:
+ Integer[] intArray = (Integer[]) value;
+ return Arrays.asList(intArray);
+ case BIGINT:
+ Long[] longArray = (Long[]) value;
+ return Arrays.asList(longArray);
+ case FLOAT:
+ Float[] floatArray = (Float[]) value;
+ return Arrays.asList(floatArray);
+ case DOUBLE:
+ Double[] doubleArray = (Double[]) value;
+ return Arrays.asList(doubleArray);
+ }
+ case ROW:
+ SeaTunnelRow row = (SeaTunnelRow) value;
+ return JsonUtils.toJsonString(row.getFields());
+ case MAP:
+ return JacksonUtils.toJsonString(value);
+ default:
+ throw new MilvusConnectorException(
+ MilvusConnectionErrorCode.NOT_SUPPORT_TYPE,
sqlType.name());
+ }
+ }
+
+ public static DataType convertSqlTypeToDataType(SqlType sqlType) {
+ switch (sqlType) {
+ case BOOLEAN:
+ return DataType.Bool;
+ case TINYINT:
+ return DataType.Int8;
+ case SMALLINT:
+ return DataType.Int16;
+ case INT:
+ return DataType.Int32;
+ case BIGINT:
+ return DataType.Int64;
+ case FLOAT:
+ return DataType.Float;
+ case DOUBLE:
+ return DataType.Double;
+ case STRING:
+ return DataType.VarChar;
+ case ARRAY:
+ return DataType.Array;
+ case FLOAT_VECTOR:
+ return DataType.FloatVector;
+ case BINARY_VECTOR:
+ return DataType.BinaryVector;
+ case FLOAT16_VECTOR:
+ return DataType.Float16Vector;
+ case BFLOAT16_VECTOR:
+ return DataType.BFloat16Vector;
+ case SPARSE_FLOAT_VECTOR:
+ return DataType.SparseFloatVector;
+ case DATE:
+ return DataType.VarChar;
+ case ROW:
+ return DataType.VarChar;
+ }
+ throw new CatalogException(
+ String.format("Not support convert to milvus type, sqlType is
%s", sqlType));
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectionErrorCode.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectionErrorCode.java
new file mode 100644
index 0000000000..3acc3de804
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectionErrorCode.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum MilvusConnectionErrorCode implements SeaTunnelErrorCode {
+ SERVER_RESPONSE_FAILED("MILVUS-01", "Milvus server response error"),
+ COLLECTION_NOT_FOUND("MILVUS-02", "Collection not found"),
+ FIELD_NOT_FOUND("MILVUS-03", "Field not found"),
+ DESC_COLLECTION_ERROR("MILVUS-04", "Desc collection error"),
+ SHOW_COLLECTIONS_ERROR("MILVUS-05", "Show collections error"),
+ COLLECTION_NOT_LOADED("MILVUS-06", "Collection not loaded"),
+ NOT_SUPPORT_TYPE("MILVUS-07", "Type not support yet"),
+ DATABASE_NO_COLLECTIONS("MILVUS-08", "Database no any collections"),
+ SOURCE_TABLE_SCHEMA_IS_NULL("MILVUS-09", "Source table schema is null"),
+ FIELD_IS_NULL("MILVUS-10", "Field is null"),
+ CLOSE_CLIENT_ERROR("MILVUS-11", "Close client error"),
+ DESC_INDEX_ERROR("MILVUS-12", "Desc index error"),
+ CREATE_DATABASE_ERROR("MILVUS-13", "Create database error"),
+ CREATE_COLLECTION_ERROR("MILVUS-14", "Create collection error"),
+ CREATE_INDEX_ERROR("MILVUS-15", "Create index error"),
+ ;
+
+ private final String code;
+ private final String description;
+
+ MilvusConnectionErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return code;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorException.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorException.java
new file mode 100644
index 0000000000..df6ea7adca
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/exception/MilvusConnectorException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class MilvusConnectorException extends SeaTunnelRuntimeException {
+ public MilvusConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
String errorMessage) {
+ super(seaTunnelErrorCode, errorMessage);
+ }
+
+ public MilvusConnectorException(SeaTunnelErrorCode seaTunnelErrorCode) {
+ super(seaTunnelErrorCode, seaTunnelErrorCode.getErrorMessage());
+ }
+
+ public MilvusConnectorException(
+ SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage,
Throwable cause) {
+ super(seaTunnelErrorCode, errorMessage, cause);
+ }
+
+ public MilvusConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
Throwable cause) {
+ super(seaTunnelErrorCode, cause);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java
new file mode 100644
index 0000000000..c5b1b82bcc
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportSaveMode;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.connectors.seatunnel.milvus.catalog.MilvusCatalogFactory;
+import
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.milvus.state.MilvusAggregatedCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.milvus.state.MilvusCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.milvus.state.MilvusSinkState;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+public class MilvusSink
+ implements SeaTunnelSink<
+ SeaTunnelRow,
+ MilvusSinkState,
+ MilvusCommitInfo,
+ MilvusAggregatedCommitInfo>,
+ SupportSaveMode {
+
+ private final ReadonlyConfig config;
+ private final CatalogTable catalogTable;
+
+ public MilvusSink(ReadonlyConfig config, CatalogTable catalogTable) {
+ this.config = config;
+ this.catalogTable = catalogTable;
+ }
+
+ @Override
+ public SinkWriter<SeaTunnelRow, MilvusCommitInfo, MilvusSinkState>
createWriter(
+ SinkWriter.Context context) {
+
+ return new MilvusSinkWriter(context, catalogTable, config,
Collections.emptyList());
+ }
+
+ @Override
+ public SinkWriter<SeaTunnelRow, MilvusCommitInfo, MilvusSinkState>
restoreWriter(
+ SinkWriter.Context context, List<MilvusSinkState> states) {
+ return new MilvusSinkWriter(context, catalogTable, config, states);
+ }
+
+ @Override
+ public Optional<Serializer<MilvusSinkState>> getWriterStateSerializer() {
+ return Optional.of(new DefaultSerializer<>());
+ }
+
+ @Override
+ public Optional<SinkCommitter<MilvusCommitInfo>> createCommitter() {
+ return Optional.of(new MilvusSinkCommitter(config));
+ }
+
+ @Override
+ public Optional<Serializer<MilvusCommitInfo>> getCommitInfoSerializer() {
+ return Optional.of(new DefaultSerializer<>());
+ }
+
+ @Override
+ public String getPluginName() {
+ return MilvusSinkConfig.CONNECTOR_IDENTITY;
+ }
+
+ @Override
+ public Optional<SaveModeHandler> getSaveModeHandler() {
+ if (catalogTable == null) {
+ return Optional.empty();
+ }
+
+ CatalogFactory catalogFactory = new MilvusCatalogFactory();
+ Catalog catalog =
catalogFactory.createCatalog(catalogTable.getCatalogName(), config);
+
+ SchemaSaveMode schemaSaveMode =
config.get(MilvusSinkConfig.SCHEMA_SAVE_MODE);
+ DataSaveMode dataSaveMode =
config.get(MilvusSinkConfig.DATA_SAVE_MODE);
+
+ catalog.open();
+ return Optional.of(
+ new DefaultSaveModeHandler(
+ schemaSaveMode,
+ dataSaveMode,
+ catalog,
+ catalogTable.getTablePath(),
+ catalogTable,
+ null));
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkCommitter.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkCommitter.java
new file mode 100644
index 0000000000..8c23bc62e6
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkCommitter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.connectors.seatunnel.milvus.state.MilvusCommitInfo;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+@Slf4j
+public class MilvusSinkCommitter implements SinkCommitter<MilvusCommitInfo> {
+
+ public MilvusSinkCommitter(ReadonlyConfig pluginConfig) {}
+
+ /**
+ * Commit message to third party data receiver, The method need to achieve
idempotency.
+ *
+ * @param commitInfos The list of commit message
+ * @return The commit message need retry.
+ * @throws IOException throw IOException when commit failed.
+ */
+ @Override
+ public List<MilvusCommitInfo> commit(List<MilvusCommitInfo> commitInfos)
throws IOException {
+ return Collections.emptyList();
+ }
+
+ /**
+ * Abort the transaction, this method will be called (**Only** on Spark
engine) when the commit
+ * is failed.
+ *
+ * @param commitInfos The list of commit message, used to abort the commit.
+ * @throws IOException throw IOException when close failed.
+ */
+ @Override
+ public void abort(List<MilvusCommitInfo> commitInfos) throws IOException {}
+}
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java
new file mode 100644
index 0000000000..6ea5b5a2ff
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.connector.TableSink;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class MilvusSinkFactory implements TableSinkFactory {
+
+ @Override
+ public String factoryIdentifier() {
+ return "Milvus";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(MilvusSinkConfig.URL, MilvusSinkConfig.TOKEN)
+ .optional(
+ MilvusSinkConfig.ENABLE_UPSERT,
+ MilvusSinkConfig.ENABLE_DYNAMIC_FIELD,
+ MilvusSinkConfig.ENABLE_AUTO_ID,
+ MilvusSinkConfig.SCHEMA_SAVE_MODE,
+ MilvusSinkConfig.DATA_SAVE_MODE)
+ .build();
+ }
+
+ public TableSink createSink(TableSinkFactoryContext context) {
+ ReadonlyConfig config = context.getOptions();
+ CatalogTable catalogTable = renameCatalogTable(config,
context.getCatalogTable());
+ return () -> new MilvusSink(config, catalogTable);
+ }
+
+ private CatalogTable renameCatalogTable(
+ ReadonlyConfig config, CatalogTable sourceCatalogTable) {
+ TableIdentifier sourceTableId = sourceCatalogTable.getTableId();
+ String databaseName;
+ if (StringUtils.isNotEmpty(config.get(MilvusSinkConfig.DATABASE))) {
+ databaseName = config.get(MilvusSinkConfig.DATABASE);
+ } else {
+ databaseName = sourceTableId.getDatabaseName();
+ }
+
+ TableIdentifier newTableId =
+ TableIdentifier.of(
+ sourceTableId.getCatalogName(),
+ databaseName,
+ sourceTableId.getSchemaName(),
+ sourceTableId.getTableName());
+
+ return CatalogTable.of(newTableId, sourceCatalogTable);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java
new file mode 100644
index 0000000000..7c823838c5
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.milvus.sink.batch.MilvusBatchWriter;
+import
org.apache.seatunnel.connectors.seatunnel.milvus.sink.batch.MilvusBufferBatchWriter;
+import org.apache.seatunnel.connectors.seatunnel.milvus.state.MilvusCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.milvus.state.MilvusSinkState;
+
+import io.milvus.v2.client.ConnectConfig;
+import io.milvus.v2.client.MilvusClientV2;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+import static
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.BATCH_SIZE;
+
+@Slf4j
+/** MilvusSinkWriter is a sink writer that will write {@link SeaTunnelRow} to
Milvus. */
+public class MilvusSinkWriter
+ implements SinkWriter<SeaTunnelRow, MilvusCommitInfo, MilvusSinkState>
{
+ private final Context context;
+
+ private final ReadonlyConfig config;
+ private MilvusBatchWriter batchWriter;
+
+ public MilvusSinkWriter(
+ Context context,
+ CatalogTable catalogTable,
+ ReadonlyConfig config,
+ List<MilvusSinkState> milvusSinkStates) {
+ this.context = context;
+ this.config = config;
+ ConnectConfig connectConfig =
+ ConnectConfig.builder()
+ .uri(config.get(MilvusSinkConfig.URL))
+ .token(config.get(MilvusSinkConfig.TOKEN))
+ .build();
+ this.batchWriter =
+ new MilvusBufferBatchWriter(
+ catalogTable,
+ config.get(BATCH_SIZE),
+
getAutoId(catalogTable.getTableSchema().getPrimaryKey()),
+ config.get(MilvusSinkConfig.ENABLE_UPSERT),
+ new MilvusClientV2(connectConfig));
+ }
+
+ /**
+ * write data to third party data receiver.
+ *
+ * @param element the data need be written.
+ * @throws IOException throw IOException when write data failed.
+ */
+ @Override
+ public void write(SeaTunnelRow element) {
+ batchWriter.addToBatch(element);
+ if (batchWriter.needFlush()) {
+ batchWriter.flush();
+ }
+ }
+
+ private Boolean getAutoId(PrimaryKey primaryKey) {
+ if (null != primaryKey && null != primaryKey.getEnableAutoId()) {
+ return primaryKey.getEnableAutoId();
+ } else {
+ return config.get(MilvusSinkConfig.ENABLE_AUTO_ID);
+ }
+ }
+
+ /**
+ * prepare the commit, will be called before {@link #snapshotState(long
checkpointId)}. If you
+ * need to use 2pc, you can return the commit info in this method, and
receive the commit info
+ * in {@link SinkCommitter#commit(List)}. If this method failed (by throw
exception), **Only**
+ * Spark engine will call {@link #abortPrepare()}
+ *
+ * @return the commit info need to commit
+ */
+ @Override
+ public Optional<MilvusCommitInfo> prepareCommit() throws IOException {
+ batchWriter.flush();
+ return Optional.empty();
+ }
+
+ /**
+ * Used to abort the {@link #prepareCommit()}, if the prepareCommit
failed, there is no
+ * CommitInfoT, so the rollback work cannot be done by {@link
SinkCommitter}. But we can use
+ * this method to rollback side effects of {@link #prepareCommit()}. Only
use it in Spark engine
+ * at now.
+ */
+ @Override
+ public void abortPrepare() {}
+
+ /**
+ * call it when SinkWriter close
+ *
+ * @throws IOException if close failed
+ */
+ @Override
+ public void close() throws IOException {
+ if (batchWriter != null) {
+ batchWriter.flush();
+ batchWriter.close();
+ }
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBatchWriter.java
similarity index 70%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
copy to
seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBatchWriter.java
index 838a384809..91e04342dc 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBatchWriter.java
@@ -15,26 +15,17 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.table.type;
+package org.apache.seatunnel.connectors.seatunnel.milvus.sink.batch;
-/** The sql type of {@link SeaTunnelDataType}. */
-public enum SqlType {
- ARRAY,
- MAP,
- STRING,
- BOOLEAN,
- TINYINT,
- SMALLINT,
- INT,
- BIGINT,
- FLOAT,
- DOUBLE,
- DECIMAL,
- NULL,
- BYTES,
- DATE,
- TIME,
- TIMESTAMP,
- ROW,
- MULTIPLE_ROW;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+public interface MilvusBatchWriter {
+
+ void addToBatch(SeaTunnelRow element);
+
+ boolean needFlush();
+
+ boolean flush();
+
+ void close();
}
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBufferBatchWriter.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBufferBatchWriter.java
new file mode 100644
index 0000000000..a323095bc2
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/batch/MilvusBufferBatchWriter.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.sink.batch;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+import
org.apache.seatunnel.connectors.seatunnel.milvus.convert.MilvusConvertUtils;
+import
org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import com.alibaba.fastjson.JSONObject;
+import io.milvus.v2.client.MilvusClientV2;
+import io.milvus.v2.service.vector.request.InsertReq;
+import io.milvus.v2.service.vector.request.UpsertReq;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static
org.apache.seatunnel.api.table.catalog.PrimaryKey.isPrimaryKeyField;
+
+public class MilvusBufferBatchWriter implements MilvusBatchWriter {
+
+ private final int batchSize;
+ private final CatalogTable catalogTable;
+ private final Boolean autoId;
+ private final Boolean enableUpsert;
+ private final String collectionName;
+ private MilvusClientV2 milvusClient;
+
+ private volatile List<JSONObject> milvusDataCache;
+ private volatile int writeCount = 0;
+
+ public MilvusBufferBatchWriter(
+ CatalogTable catalogTable,
+ Integer batchSize,
+ Boolean autoId,
+ Boolean enableUpsert,
+ MilvusClientV2 milvusClient) {
+ this.catalogTable = catalogTable;
+ this.autoId = autoId;
+ this.enableUpsert = enableUpsert;
+ this.milvusClient = milvusClient;
+ this.collectionName = catalogTable.getTablePath().getTableName();
+ this.batchSize = batchSize;
+ this.milvusDataCache = new ArrayList<>(batchSize);
+ }
+
+ @Override
+ public void addToBatch(SeaTunnelRow element) {
+ JSONObject data = buildMilvusData(element);
+ milvusDataCache.add(data);
+ writeCount++;
+ }
+
+ @Override
+ public boolean needFlush() {
+ return this.writeCount >= this.batchSize;
+ }
+
+ @Override
+ public synchronized boolean flush() {
+ if (CollectionUtils.isEmpty(this.milvusDataCache)) {
+ return true;
+ }
+ writeData2Collection();
+ this.milvusDataCache = new ArrayList<>(this.batchSize);
+ this.writeCount = 0;
+ return true;
+ }
+
+ @Override
+ public void close() {
+ try {
+ this.milvusClient.close(10);
+ } catch (InterruptedException e) {
+ throw new SeaTunnelException(e);
+ }
+ }
+
+ private JSONObject buildMilvusData(SeaTunnelRow element) {
+ SeaTunnelRowType seaTunnelRowType = catalogTable.getSeaTunnelRowType();
+ PrimaryKey primaryKey = catalogTable.getTableSchema().getPrimaryKey();
+
+ JSONObject data = new JSONObject();
+ for (int i = 0; i < seaTunnelRowType.getFieldNames().length; i++) {
+ String fieldName = seaTunnelRowType.getFieldNames()[i];
+
+ if (autoId && isPrimaryKeyField(primaryKey, fieldName)) {
+ continue; // if create table open AutoId, then don't need
insert data with
+ // primaryKey field.
+ }
+
+ SeaTunnelDataType<?> fieldType = seaTunnelRowType.getFieldType(i);
+ Object value = element.getField(i);
+ if (null == value) {
+ throw new MilvusConnectorException(
+ MilvusConnectionErrorCode.FIELD_IS_NULL, fieldName);
+ }
+ data.put(fieldName,
MilvusConvertUtils.convertBySeaTunnelType(fieldType, value));
+ }
+ return data;
+ }
+
+ private void writeData2Collection() {
+ // default to use upsertReq, but upsert only works when autoID is
disabled
+ if (enableUpsert && !autoId) {
+ UpsertReq upsertReq =
+ UpsertReq.builder()
+ .collectionName(this.collectionName)
+ .data(this.milvusDataCache)
+ .build();
+ milvusClient.upsert(upsertReq);
+ } else {
+ InsertReq insertReq =
+ InsertReq.builder()
+ .collectionName(this.collectionName)
+ .data(this.milvusDataCache)
+ .build();
+ milvusClient.insert(insertReq);
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSource.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSource.java
new file mode 100644
index 0000000000..05e9aed769
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSource.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.source;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SupportColumnProjection;
+import org.apache.seatunnel.api.source.SupportParallelism;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.connectors.seatunnel.milvus.convert.MilvusConvertUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class MilvusSource
+ implements SeaTunnelSource<SeaTunnelRow, MilvusSourceSplit,
MilvusSourceState>,
+ SupportParallelism,
+ SupportColumnProjection {
+
+ private final ReadonlyConfig config;
+ private final Map<TablePath, CatalogTable> sourceTables;
+
+ public MilvusSource(ReadonlyConfig sourceConfig) {
+ this.config = sourceConfig;
+ this.sourceTables = MilvusConvertUtils.getSourceTables(config);
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ public List<CatalogTable> getProducedCatalogTables() {
+ return new ArrayList<>(sourceTables.values());
+ }
+
+ @Override
+ public SourceReader<SeaTunnelRow, MilvusSourceSplit> createReader(
+ SourceReader.Context readerContext) throws Exception {
+ return new MilvusSourceReader(readerContext, config, sourceTables);
+ }
+
+ @Override
+ public SourceSplitEnumerator<MilvusSourceSplit, MilvusSourceState>
createEnumerator(
+ SourceSplitEnumerator.Context<MilvusSourceSplit> context) throws
Exception {
+ return new MilvusSourceSplitEnumertor(context, config, sourceTables,
null);
+ }
+
+ @Override
+ public SourceSplitEnumerator<MilvusSourceSplit, MilvusSourceState>
restoreEnumerator(
+ SourceSplitEnumerator.Context<MilvusSourceSplit> context,
+ MilvusSourceState checkpointState)
+ throws Exception {
+ return new MilvusSourceSplitEnumertor(context, config, sourceTables,
checkpointState);
+ }
+
+ @Override
+ public String getPluginName() {
+ return "Milvus";
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceFactory.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceFactory.java
new file mode 100644
index 0000000000..d511026a85
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceConfig;
+
+import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.Serializable;
+
+@Slf4j
+@AutoService(Factory.class)
+public class MilvusSourceFactory implements TableSourceFactory {
+
+ @Override
+ public <T, SplitT extends SourceSplit, StateT extends Serializable>
+ TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
+ return () -> (SeaTunnelSource<T, SplitT, StateT>) new
MilvusSource(context.getOptions());
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(MilvusSourceConfig.URL, MilvusSourceConfig.TOKEN)
+ .optional(MilvusSourceConfig.DATABASE,
MilvusSourceConfig.COLLECTION)
+ .build();
+ }
+
+ @Override
+ public Class<? extends SeaTunnelSource> getSourceClass() {
+ return MilvusSource.class;
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return "Milvus";
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java
new file mode 100644
index 0000000000..e52f264264
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.source;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException;
+
+import org.apache.curator.shaded.com.google.common.collect.Lists;
+
+import io.milvus.client.MilvusServiceClient;
+import io.milvus.grpc.GetLoadStateResponse;
+import io.milvus.grpc.LoadState;
+import io.milvus.orm.iterator.QueryIterator;
+import io.milvus.param.ConnectParam;
+import io.milvus.param.R;
+import io.milvus.param.collection.GetLoadStateParam;
+import io.milvus.param.dml.QueryIteratorParam;
+import io.milvus.response.QueryResultsWrapper;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+@Slf4j
+public class MilvusSourceReader implements SourceReader<SeaTunnelRow,
MilvusSourceSplit> {
+
+ private final Deque<MilvusSourceSplit> pendingSplits = new
ConcurrentLinkedDeque<>();
+ private final ReadonlyConfig config;
+ private final Context context;
+ private Map<TablePath, CatalogTable> sourceTables;
+
+ private MilvusServiceClient client;
+
+ private volatile boolean noMoreSplit;
+
+ public MilvusSourceReader(
+ Context readerContext,
+ ReadonlyConfig config,
+ Map<TablePath, CatalogTable> sourceTables) {
+ this.context = readerContext;
+ this.config = config;
+ this.sourceTables = sourceTables;
+ }
+
+ @Override
+ public void open() throws Exception {
+ client =
+ new MilvusServiceClient(
+ ConnectParam.newBuilder()
+ .withUri(config.get(MilvusSourceConfig.URL))
+
.withToken(config.get(MilvusSourceConfig.TOKEN))
+ .build());
+ }
+
+ @Override
+ public void close() throws IOException {
+ client.close();
+ }
+
+ @Override
+ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+ synchronized (output.getCheckpointLock()) {
+ MilvusSourceSplit split = pendingSplits.poll();
+ if (null != split) {
+ handleEveryRowInternal(split, output);
+ } else {
+ if (!noMoreSplit) {
+ log.info("Milvus source wait split!");
+ }
+ }
+ }
+ if (noMoreSplit
+ && pendingSplits.isEmpty()
+ && Boundedness.BOUNDED.equals(context.getBoundedness())) {
+ // signal to the source that we have reached the end of the data.
+ log.info("Closed the bounded milvus source");
+ context.signalNoMoreElement();
+ }
+ Thread.sleep(1000L);
+ }
+
+ private void handleEveryRowInternal(MilvusSourceSplit split,
Collector<SeaTunnelRow> output) {
+ TablePath tablePath = split.getTablePath();
+ TableSchema tableSchema = sourceTables.get(tablePath).getTableSchema();
+ if (null == tableSchema) {
+ throw new MilvusConnectorException(
+ MilvusConnectionErrorCode.SOURCE_TABLE_SCHEMA_IS_NULL);
+ }
+
+ R<GetLoadStateResponse> loadStateResponse =
+ client.getLoadState(
+ GetLoadStateParam.newBuilder()
+ .withDatabaseName(tablePath.getDatabaseName())
+ .withCollectionName(tablePath.getTableName())
+ .build());
+ if (loadStateResponse.getStatus() != R.Status.Success.getCode()) {
+ throw new MilvusConnectorException(
+ MilvusConnectionErrorCode.SERVER_RESPONSE_FAILED,
+ loadStateResponse.getException());
+ }
+
+ if
(!LoadState.LoadStateLoaded.equals(loadStateResponse.getData().getState())) {
+ throw new
MilvusConnectorException(MilvusConnectionErrorCode.COLLECTION_NOT_LOADED);
+ }
+
+ QueryIteratorParam param =
+ QueryIteratorParam.newBuilder()
+ .withDatabaseName(tablePath.getDatabaseName())
+ .withCollectionName(tablePath.getTableName())
+ .withOutFields(Lists.newArrayList("*"))
+ .build();
+
+ R<QueryIterator> response = client.queryIterator(param);
+ if (response.getStatus() != R.Status.Success.getCode()) {
+ throw new MilvusConnectorException(
+ MilvusConnectionErrorCode.SERVER_RESPONSE_FAILED,
+ loadStateResponse.getException());
+ }
+
+ QueryIterator iterator = response.getData();
+ while (true) {
+ List<QueryResultsWrapper.RowRecord> next = iterator.next();
+ if (next == null || next.isEmpty()) {
+ break;
+ } else {
+ for (QueryResultsWrapper.RowRecord record : next) {
+ SeaTunnelRow seaTunnelRow =
+ convertToSeaTunnelRow(record, tableSchema,
tablePath);
+ output.collect(seaTunnelRow);
+ }
+ }
+ }
+ }
+
+ public SeaTunnelRow convertToSeaTunnelRow(
+ QueryResultsWrapper.RowRecord record, TableSchema tableSchema,
TablePath tablePath) {
+ SeaTunnelRowType typeInfo = tableSchema.toPhysicalRowDataType();
+ Object[] fields = new Object[record.getFieldValues().size()];
+ Map<String, Object> fieldValuesMap = record.getFieldValues();
+ String[] fieldNames = typeInfo.getFieldNames();
+ for (int fieldIndex = 0; fieldIndex < typeInfo.getTotalFields();
fieldIndex++) {
+ SeaTunnelDataType<?> seaTunnelDataType =
typeInfo.getFieldType(fieldIndex);
+ Object filedValues = fieldValuesMap.get(fieldNames[fieldIndex]);
+ switch (seaTunnelDataType.getSqlType()) {
+ case STRING:
+ fields[fieldIndex] = filedValues.toString();
+ break;
+ case BOOLEAN:
+ if (filedValues instanceof Boolean) {
+ fields[fieldIndex] = filedValues;
+ } else {
+ fields[fieldIndex] =
Boolean.valueOf(filedValues.toString());
+ }
+ break;
+ case INT:
+ if (filedValues instanceof Integer) {
+ fields[fieldIndex] = filedValues;
+ } else {
+ fields[fieldIndex] =
Integer.valueOf(filedValues.toString());
+ }
+ break;
+ case BIGINT:
+ if (filedValues instanceof Long) {
+ fields[fieldIndex] = filedValues;
+ } else {
+ fields[fieldIndex] =
Long.parseLong(filedValues.toString());
+ }
+ break;
+ case FLOAT:
+ if (filedValues instanceof Float) {
+ fields[fieldIndex] = filedValues;
+ } else {
+ fields[fieldIndex] =
Float.parseFloat(filedValues.toString());
+ }
+ break;
+ case DOUBLE:
+ if (filedValues instanceof Double) {
+ fields[fieldIndex] = filedValues;
+ } else {
+ fields[fieldIndex] =
Double.parseDouble(filedValues.toString());
+ }
+ break;
+ case FLOAT_VECTOR:
+ if (filedValues instanceof List) {
+ List list = (List) filedValues;
+ Float[] arrays = new Float[list.size()];
+ for (int i = 0; i < list.size(); i++) {
+ arrays[i] =
Float.parseFloat(list.get(i).toString());
+ }
+ fields[fieldIndex] = arrays;
+ break;
+ } else {
+ throw new MilvusConnectorException(
+ CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+ "Unexpected vector value: " + filedValues);
+ }
+ default:
+ throw new MilvusConnectorException(
+ CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+ "Unexpected value: " +
seaTunnelDataType.getSqlType().name());
+ }
+ }
+
+ SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);
+ seaTunnelRow.setTableId(tablePath.getFullName());
+ seaTunnelRow.setRowKind(RowKind.INSERT);
+ return seaTunnelRow;
+ }
+
+ @Override
+ public List<MilvusSourceSplit> snapshotState(long checkpointId) throws
Exception {
+ return new ArrayList<>(pendingSplits);
+ }
+
+ @Override
+ public void addSplits(List<MilvusSourceSplit> splits) {
+ log.info("Adding milvus splits to reader: {}", splits);
+ pendingSplits.addAll(splits);
+ }
+
+ @Override
+ public void handleNoMoreSplits() {
+ log.info("receive no more splits message, this milvus reader will not
add new split.");
+ noMoreSplit = true;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {}
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplit.java
similarity index 65%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
copy to
seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplit.java
index 838a384809..e79d74b6dc 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplit.java
@@ -15,26 +15,23 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.table.type;
+package org.apache.seatunnel.connectors.seatunnel.milvus.source;
-/** The sql type of {@link SeaTunnelDataType}. */
-public enum SqlType {
- ARRAY,
- MAP,
- STRING,
- BOOLEAN,
- TINYINT,
- SMALLINT,
- INT,
- BIGINT,
- FLOAT,
- DOUBLE,
- DECIMAL,
- NULL,
- BYTES,
- DATE,
- TIME,
- TIMESTAMP,
- ROW,
- MULTIPLE_ROW;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder
+public class MilvusSourceSplit implements SourceSplit {
+
+ private TablePath tablePath;
+ private String splitId;
+
+ @Override
+ public String splitId() {
+ return splitId;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumertor.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumertor.java
new file mode 100644
index 0000000000..e01e9c8ad5
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumertor.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.milvus.source;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import
org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+@Slf4j
+public class MilvusSourceSplitEnumertor
+ implements SourceSplitEnumerator<MilvusSourceSplit, MilvusSourceState>
{
+
+ private final Map<TablePath, CatalogTable> tables;
+ private final Context<MilvusSourceSplit> context;
+ private final ConcurrentLinkedQueue<TablePath> pendingTables;
+ private final Map<Integer, List<MilvusSourceSplit>> pendingSplits;
+ private final Object stateLock = new Object();
+
+ private ReadonlyConfig config;
+
+ public MilvusSourceSplitEnumertor(
+ Context<MilvusSourceSplit> context,
+ ReadonlyConfig config,
+ Map<TablePath, CatalogTable> sourceTables,
+ MilvusSourceState sourceState) {
+ this.context = context;
+ this.tables = sourceTables;
+ this.config = config;
+ if (sourceState == null) {
+ this.pendingTables = new ConcurrentLinkedQueue<>(tables.keySet());
+ this.pendingSplits = new HashMap<>();
+ } else {
+ this.pendingTables = new
ConcurrentLinkedQueue<>(sourceState.getPendingTables());
+ this.pendingSplits = new HashMap<>(sourceState.getPendingSplits());
+ }
+ }
+
+ @Override
+ public void open() {}
+
+ @Override
+ public void run() throws Exception {
+ log.info("Starting milvus split enumerator.");
+ Set<Integer> readers = context.registeredReaders();
+ while (!pendingTables.isEmpty()) {
+ synchronized (stateLock) {
+ TablePath tablePath = pendingTables.poll();
+ log.info("begin to split table path: {}", tablePath);
+ Collection<MilvusSourceSplit> splits =
generateSplits(tables.get(tablePath));
+ log.info("end to split table {} into {} splits.", tablePath,
splits.size());
+
+ addPendingSplit(splits);
+ }
+
+ synchronized (stateLock) {
+ assignSplit(readers);
+ }
+ }
+
+ log.info("No more splits to assign." + " Sending NoMoreSplitsEvent to
reader {}.", readers);
+ readers.forEach(context::signalNoMoreSplits);
+ }
+
+ private Collection<MilvusSourceSplit> generateSplits(CatalogTable table) {
+ log.info("Start splitting table {} into chunks...",
table.getTablePath());
+ MilvusSourceSplit milvusSourceSplit =
+ MilvusSourceSplit.builder()
+ .splitId(createSplitId(table.getTablePath(), 0))
+ .tablePath(table.getTablePath())
+ .build();
+
+ return Collections.singletonList(milvusSourceSplit);
+ }
+
+ protected String createSplitId(TablePath tablePath, int index) {
+ return String.format("%s-%s", tablePath, index);
+ }
+
+ private void addPendingSplit(Collection<MilvusSourceSplit> splits) {
+ int readerCount = context.currentParallelism();
+ for (MilvusSourceSplit split : splits) {
+ int ownerReader = getSplitOwner(split.splitId(), readerCount);
+ log.info("Assigning {} to {} reader.", split, ownerReader);
+
+ pendingSplits.computeIfAbsent(ownerReader, r -> new
ArrayList<>()).add(split);
+ }
+ }
+
+ private static int getSplitOwner(String tp, int numReaders) {
+ return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
+ }
+
+ private void assignSplit(Collection<Integer> readers) {
+ log.info("Assign pendingSplits to readers {}", readers);
+
+ for (int reader : readers) {
+ List<MilvusSourceSplit> assignmentForReader =
pendingSplits.remove(reader);
+ if (assignmentForReader != null && !assignmentForReader.isEmpty())
{
+ log.debug("Assign splits {} to reader {}",
assignmentForReader, reader);
+ context.assignSplit(reader, assignmentForReader);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {}
+
+ @Override
+ public void addSplitsBack(List<MilvusSourceSplit> splits, int subtaskId) {
+ if (!splits.isEmpty()) {
+ synchronized (stateLock) {
+ addPendingSplit(splits, subtaskId);
+ if (context.registeredReaders().contains(subtaskId)) {
+ assignSplit(Collections.singletonList(subtaskId));
+ } else {
+ log.warn(
+ "Reader {} is not registered. Pending splits {}
are not assigned.",
+ subtaskId,
+ splits);
+ }
+ }
+ }
+ log.info("Add back splits {} to JdbcSourceSplitEnumerator.",
splits.size());
+ }
+
+ private void addPendingSplit(Collection<MilvusSourceSplit> splits, int
ownerReader) {
+ pendingSplits.computeIfAbsent(ownerReader, r -> new
ArrayList<>()).addAll(splits);
+ }
+
+ @Override
+ public int currentUnassignedSplitSize() {
+ return pendingTables.isEmpty() && pendingSplits.isEmpty() ? 0 : 1;
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId) {
+ throw new MilvusConnectorException(
+ CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
+ String.format("Unsupported handleSplitRequest: %d",
subtaskId));
+ }
+
+ @Override
+ public void registerReader(int subtaskId) {
+ log.info("Register reader {} to MilvusSourceSplitEnumerator.",
subtaskId);
+ if (!pendingSplits.isEmpty()) {
+ synchronized (stateLock) {
+ assignSplit(Collections.singletonList(subtaskId));
+ }
+ }
+ }
+
+ @Override
+ public MilvusSourceState snapshotState(long checkpointId) throws Exception
{
+ synchronized (stateLock) {
+ return new MilvusSourceState(
+ new ArrayList(pendingTables), new
HashMap<>(pendingSplits));
+ }
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {}
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceState.java
similarity index 64%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
copy to
seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceState.java
index 838a384809..7b6c2e0672 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceState.java
@@ -15,26 +15,20 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.table.type;
+package org.apache.seatunnel.connectors.seatunnel.milvus.source;
-/** The sql type of {@link SeaTunnelDataType}. */
-public enum SqlType {
- ARRAY,
- MAP,
- STRING,
- BOOLEAN,
- TINYINT,
- SMALLINT,
- INT,
- BIGINT,
- FLOAT,
- DOUBLE,
- DECIMAL,
- NULL,
- BYTES,
- DATE,
- TIME,
- TIMESTAMP,
- ROW,
- MULTIPLE_ROW;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+@Data
+@AllArgsConstructor
+public class MilvusSourceState implements Serializable {
+ private List<TablePath> pendingTables;
+ private Map<Integer, List<MilvusSourceSplit>> pendingSplits;
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/state/MilvusAggregatedCommitInfo.java
similarity index 70%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
copy to
seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/state/MilvusAggregatedCommitInfo.java
index 838a384809..d4bc422d9b 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/state/MilvusAggregatedCommitInfo.java
@@ -15,26 +15,16 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.table.type;
+package org.apache.seatunnel.connectors.seatunnel.milvus.state;
-/** The sql type of {@link SeaTunnelDataType}. */
-public enum SqlType {
- ARRAY,
- MAP,
- STRING,
- BOOLEAN,
- TINYINT,
- SMALLINT,
- INT,
- BIGINT,
- FLOAT,
- DOUBLE,
- DECIMAL,
- NULL,
- BYTES,
- DATE,
- TIME,
- TIMESTAMP,
- ROW,
- MULTIPLE_ROW;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+@AllArgsConstructor
+public class MilvusAggregatedCommitInfo implements Serializable {
+ List<MilvusCommitInfo> commitInfos;
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/state/MilvusCommitInfo.java
similarity index 70%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
copy to
seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/state/MilvusCommitInfo.java
index 838a384809..f6887ffa06 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/state/MilvusCommitInfo.java
@@ -15,26 +15,13 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.table.type;
+package org.apache.seatunnel.connectors.seatunnel.milvus.state;
-/** The sql type of {@link SeaTunnelDataType}. */
-public enum SqlType {
- ARRAY,
- MAP,
- STRING,
- BOOLEAN,
- TINYINT,
- SMALLINT,
- INT,
- BIGINT,
- FLOAT,
- DOUBLE,
- DECIMAL,
- NULL,
- BYTES,
- DATE,
- TIME,
- TIMESTAMP,
- ROW,
- MULTIPLE_ROW;
-}
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+@AllArgsConstructor
+public class MilvusCommitInfo implements Serializable {}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/state/MilvusSinkState.java
similarity index 70%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
copy to
seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/state/MilvusSinkState.java
index 838a384809..3d8ff62b1d 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/state/MilvusSinkState.java
@@ -15,26 +15,15 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.table.type;
+package org.apache.seatunnel.connectors.seatunnel.milvus.state;
-/** The sql type of {@link SeaTunnelDataType}. */
-public enum SqlType {
- ARRAY,
- MAP,
- STRING,
- BOOLEAN,
- TINYINT,
- SMALLINT,
- INT,
- BIGINT,
- FLOAT,
- DOUBLE,
- DECIMAL,
- NULL,
- BYTES,
- DATE,
- TIME,
- TIMESTAMP,
- ROW,
- MULTIPLE_ROW;
-}
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.experimental.SuperBuilder;
+
+import java.io.Serializable;
+
+@Data
+@SuperBuilder
+@AllArgsConstructor
+public class MilvusSinkState implements Serializable {}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index 0498ff4539..68274736f0 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -77,6 +77,7 @@
<module>connector-paimon</module>
<module>connector-easysearch</module>
<module>connector-web3j</module>
+ <module>connector-milvus</module>
</modules>
<dependencyManagement>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index 37f1cbebf4..a5dd203f83 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -576,6 +576,13 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-milvus</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<!-- jdbc driver -->
<dependency>
<groupId>com.aliyun.phoenix</groupId>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/pom.xml
new file mode 100644
index 0000000000..2175811c6c
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/pom.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connector-v2-e2e</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>connector-milvus-e2e</artifactId>
+ <name>SeaTunnel : E2E : Connector V2 : Milvus</name>
+
+ <properties>
+ <testcontainer.milvus.version>1.19.8</testcontainer.milvus.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-milvus</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>2.8.9</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>milvus</artifactId>
+ <version>${testcontainer.milvus.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-assert</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-fake</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
new file mode 100644
index 0000000000..5356433057
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.v2.milvus;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.milvus.MilvusContainer;
+
+import com.alibaba.fastjson.JSONObject;
+import io.milvus.client.MilvusServiceClient;
+import io.milvus.grpc.DataType;
+import io.milvus.grpc.DescribeCollectionResponse;
+import io.milvus.grpc.FieldSchema;
+import io.milvus.grpc.MutationResult;
+import io.milvus.param.ConnectParam;
+import io.milvus.param.IndexType;
+import io.milvus.param.MetricType;
+import io.milvus.param.R;
+import io.milvus.param.RpcStatus;
+import io.milvus.param.collection.CreateCollectionParam;
+import io.milvus.param.collection.DescribeCollectionParam;
+import io.milvus.param.collection.FieldType;
+import io.milvus.param.collection.HasCollectionParam;
+import io.milvus.param.collection.LoadCollectionParam;
+import io.milvus.param.dml.InsertParam;
+import io.milvus.param.index.CreateIndexParam;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@Slf4j
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Currently SPARK and FLINK not support adapt")
+public class MilvusIT extends TestSuiteBase implements TestResource {
+
+ private static final String HOST = "milvus-e2e";
+ private static final String MILVUS_IMAGE =
"milvusdb/milvus:2.4-20240711-7e2a9d6b";
+ private static final String TOKEN = "root:Milvus";
+ private MilvusContainer container;
+ private MilvusServiceClient milvusClient;
+ private static final String COLLECTION_NAME = "simple_example";
+ private static final String ID_FIELD = "book_id";
+ private static final String VECTOR_FIELD = "book_intro";
+ private static final String TITLE_FIELD = "book_title";
+ private static final Integer VECTOR_DIM = 4;
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ this.container =
+ new
MilvusContainer(MILVUS_IMAGE).withNetwork(NETWORK).withNetworkAliases(HOST);
+ Startables.deepStart(Stream.of(this.container)).join();
+ log.info("Milvus host is {}", container.getHost());
+ log.info("Milvus container started");
+ Awaitility.given().ignoreExceptions().await().atMost(720L,
TimeUnit.SECONDS);
+ this.initMilvus();
+ this.initSourceData();
+ }
+
+ private void initMilvus()
+ throws SQLException, ClassNotFoundException,
InstantiationException,
+ IllegalAccessException {
+ milvusClient =
+ new MilvusServiceClient(
+ ConnectParam.newBuilder()
+ .withUri(this.container.getEndpoint())
+ .withToken(TOKEN)
+ .build());
+ }
+
+ private void initSourceData() {
+ // Define fields
+ List<FieldType> fieldsSchema =
+ Arrays.asList(
+ FieldType.newBuilder()
+ .withName(ID_FIELD)
+ .withDataType(DataType.Int64)
+ .withPrimaryKey(true)
+ .withAutoID(false)
+ .build(),
+ FieldType.newBuilder()
+ .withName(VECTOR_FIELD)
+ .withDataType(DataType.FloatVector)
+ .withDimension(VECTOR_DIM)
+ .build(),
+ FieldType.newBuilder()
+ .withName(TITLE_FIELD)
+ .withDataType(DataType.VarChar)
+ .withMaxLength(64)
+ .build());
+
+ // Create the collection with 3 fields
+ R<RpcStatus> ret =
+ milvusClient.createCollection(
+ CreateCollectionParam.newBuilder()
+ .withCollectionName(COLLECTION_NAME)
+ .withFieldTypes(fieldsSchema)
+ .build());
+ if (ret.getStatus() != R.Status.Success.getCode()) {
+ throw new RuntimeException("Failed to create collection! Error: "
+ ret.getMessage());
+ }
+
+ // Specify an index type on the vector field.
+ ret =
+ milvusClient.createIndex(
+ CreateIndexParam.newBuilder()
+ .withCollectionName(COLLECTION_NAME)
+ .withFieldName(VECTOR_FIELD)
+ .withIndexType(IndexType.FLAT)
+ .withMetricType(MetricType.L2)
+ .build());
+ if (ret.getStatus() != R.Status.Success.getCode()) {
+ throw new RuntimeException(
+ "Failed to create index on vector field! Error: " +
ret.getMessage());
+ }
+
+ // Call loadCollection() to enable automatically loading data into
memory for searching
+ milvusClient.loadCollection(
+
LoadCollectionParam.newBuilder().withCollectionName(COLLECTION_NAME).build());
+
+ log.info("Collection created");
+
+ // Insert 10 records into the collection
+ List<JSONObject> rows = new ArrayList<>();
+ for (long i = 1L; i <= 10; ++i) {
+ JSONObject row = new JSONObject();
+ row.put(ID_FIELD, i);
+ List<Float> vector = Arrays.asList((float) i, (float) i, (float)
i, (float) i);
+ row.put(VECTOR_FIELD, vector);
+ row.put(TITLE_FIELD, "Tom and Jerry " + i);
+ rows.add(row);
+ }
+
+ R<MutationResult> insertRet =
+ milvusClient.insert(
+ InsertParam.newBuilder()
+ .withCollectionName(COLLECTION_NAME)
+ .withRows(rows)
+ .build());
+ if (insertRet.getStatus() != R.Status.Success.getCode()) {
+ throw new RuntimeException("Failed to insert! Error: " +
insertRet.getMessage());
+ }
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() throws Exception {
+ this.milvusClient.close();
+ this.container.close();
+ }
+
+ @TestTemplate
+ public void testMilvus(TestContainer container) throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/milvus-to-milvus.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ // assert table exist
+ R<Boolean> hasCollectionResponse =
+ this.milvusClient.hasCollection(
+ HasCollectionParam.newBuilder()
+ .withDatabaseName("test")
+ .withCollectionName(COLLECTION_NAME)
+ .build());
+ Assertions.assertTrue(hasCollectionResponse.getData());
+
+ // check table fields
+ R<DescribeCollectionResponse> describeCollectionResponseR =
+ this.milvusClient.describeCollection(
+ DescribeCollectionParam.newBuilder()
+ .withDatabaseName("test")
+ .withCollectionName(COLLECTION_NAME)
+ .build());
+
+ DescribeCollectionResponse data =
describeCollectionResponseR.getData();
+ List<String> fileds =
+ data.getSchema().getFieldsList().stream()
+ .map(FieldSchema::getName)
+ .collect(Collectors.toList());
+ Assertions.assertTrue(fileds.contains(ID_FIELD));
+ Assertions.assertTrue(fileds.contains(VECTOR_FIELD));
+ Assertions.assertTrue(fileds.contains(TITLE_FIELD));
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/milvus-to-milvus.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/milvus-to-milvus.conf
new file mode 100644
index 0000000000..5b5b9aec78
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/milvus-to-milvus.conf
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Milvus {
+ url = "http://milvus-e2e:19530"
+ token = "root:Milvus"
+ }
+}
+
+sink {
+ Milvus {
+ url = "http://milvus-e2e:19530"
+ token = "root:Milvus"
+ database="test"
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 47864f21c6..0a0f909e19 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -74,6 +74,7 @@
<module>connector-cdc-oracle-e2e</module>
<module>connector-hive-e2e</module>
<module>connector-hudi-e2e</module>
+ <module>connector-milvus-e2e</module>
</modules>
<dependencies>