This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5247e17640 [improve] milvus options (#9165)
5247e17640 is described below
commit 5247e176409c94fd020370ac7d58b88e9d396621
Author: Jarvis <[email protected]>
AuthorDate: Mon Apr 14 10:11:40 2025 +0800
[improve] milvus options (#9165)
---
.../seatunnel/api/ConnectorOptionCheckTest.java | 2 --
.../seatunnel/milvus/catalog/MilvusCatalog.java | 22 +++++++++++-----------
...vusCommonConfig.java => MilvusBaseOptions.java} | 2 +-
...ilvusSinkConfig.java => MilvusSinkOptions.java} | 2 +-
...sSourceConfig.java => MilvusSourceOptions.java} | 2 +-
.../milvus/sink/MilvusBufferBatchWriter.java | 16 ++++++++--------
.../seatunnel/milvus/sink/MilvusSink.java | 8 ++++----
.../seatunnel/milvus/sink/MilvusSinkFactory.java | 18 +++++++++---------
.../seatunnel/milvus/source/MilvusSource.java | 4 ++--
.../milvus/source/MilvusSourceFactory.java | 6 +++---
.../milvus/source/MilvusSourceReader.java | 10 +++++-----
.../milvus/source/MilvusSourceSplitEnumerator.java | 6 +++---
.../seatunnel/milvus/utils/MilvusConvertUtils.java | 12 ++++++------
.../milvus/utils/sink/MilvusSinkConverter.java | 4 ++--
.../e2e/connector/v2/milvus/MilvusIT.java | 6 +++---
15 files changed, 59 insertions(+), 61 deletions(-)
diff --git
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index e0dd66b0c3..071513e5f0 100644
---
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -204,13 +204,11 @@ public class ConnectorOptionCheckTest {
whiteList.add("SlsSourceOptions");
whiteList.add("SentrySinkOptions");
whiteList.add("QdrantSinkOptions");
- whiteList.add("MilvusSourceOptions");
whiteList.add("RocketMqSinkOptions");
whiteList.add("MaxcomputeSourceOptions");
whiteList.add("KuduSourceOptions");
whiteList.add("SocketSinkOptions");
whiteList.add("SelectDBSinkOptions");
- whiteList.add("MilvusSinkOptions");
whiteList.add("RocketMqSourceOptions");
whiteList.add("TablestoreSinkOptions");
whiteList.add("TableStoreDBSourceOptions");
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalog.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalog.java
index 99717b75fa..21edbda72b 100644
---
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalog.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalog.java
@@ -33,7 +33,7 @@ import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistExceptio
import
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.type.CommonOptions;
-import
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkOptions;
import
org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.milvus.utils.sink.MilvusSinkConverter;
@@ -71,7 +71,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
-import static
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.CREATE_INDEX;
+import static
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkOptions.CREATE_INDEX;
import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
@Slf4j
@@ -91,8 +91,8 @@ public class MilvusCatalog implements Catalog {
public void open() throws CatalogException {
ConnectParam connectParam =
ConnectParam.newBuilder()
- .withUri(config.get(MilvusSinkConfig.URL))
- .withToken(config.get(MilvusSinkConfig.TOKEN))
+ .withUri(config.get(MilvusSinkOptions.URL))
+ .withToken(config.get(MilvusSinkOptions.TOKEN))
.build();
try {
this.client = new MilvusServiceClient(connectParam);
@@ -242,9 +242,9 @@ public class MilvusCatalog implements Catalog {
String partitionKeyField =
existPartitionKeyField ?
options.get(MilvusOptions.PARTITION_KEY_FIELD) : null;
// if options set, will overwrite aut read
- if
(StringUtils.isNotEmpty(config.get(MilvusSinkConfig.PARTITION_KEY))) {
+ if
(StringUtils.isNotEmpty(config.get(MilvusSinkOptions.PARTITION_KEY))) {
existPartitionKeyField = true;
- partitionKeyField = config.get(MilvusSinkConfig.PARTITION_KEY);
+ partitionKeyField =
config.get(MilvusSinkOptions.PARTITION_KEY);
}
TableSchema tableSchema = catalogTable.getTableSchema();
@@ -261,21 +261,21 @@ public class MilvusCatalog implements Catalog {
column,
tableSchema.getPrimaryKey(),
partitionKeyField,
- config.get(MilvusSinkConfig.ENABLE_AUTO_ID));
+ config.get(MilvusSinkOptions.ENABLE_AUTO_ID));
fieldTypes.add(fieldType);
}
Boolean enableDynamicField =
(options.containsKey(MilvusOptions.ENABLE_DYNAMIC_FIELD))
?
Boolean.valueOf(options.get(MilvusOptions.ENABLE_DYNAMIC_FIELD))
- :
config.get(MilvusSinkConfig.ENABLE_DYNAMIC_FIELD);
+ :
config.get(MilvusSinkOptions.ENABLE_DYNAMIC_FIELD);
String collectionDescription = "";
- if (config.get(MilvusSinkConfig.COLLECTION_DESCRIPTION) != null
- && config.get(MilvusSinkConfig.COLLECTION_DESCRIPTION)
+ if (config.get(MilvusSinkOptions.COLLECTION_DESCRIPTION) != null
+ && config.get(MilvusSinkOptions.COLLECTION_DESCRIPTION)
.containsKey(tablePath.getTableName())) {
// use description from config first
collectionDescription =
- config.get(MilvusSinkConfig.COLLECTION_DESCRIPTION)
+ config.get(MilvusSinkOptions.COLLECTION_DESCRIPTION)
.get(tablePath.getTableName());
} else if (null != catalogTable.getComment()) {
collectionDescription = catalogTable.getComment();
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusCommonConfig.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusBaseOptions.java
similarity index 97%
rename from
seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusCommonConfig.java
rename to
seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusBaseOptions.java
index fe2e5cfc69..956701e74d 100644
---
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusCommonConfig.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusBaseOptions.java
@@ -20,7 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.milvus.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-public abstract class MilvusCommonConfig {
+public abstract class MilvusBaseOptions {
public static final String CONNECTOR_IDENTITY = "Milvus";
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkConfig.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkOptions.java
similarity index 98%
rename from
seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkConfig.java
rename to
seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkOptions.java
index 8d874fc0ae..1c7d00f3a9 100644
---
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkOptions.java
@@ -30,7 +30,7 @@ import static
org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
import static
org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
-public class MilvusSinkConfig extends MilvusCommonConfig {
+public class MilvusSinkOptions extends MilvusBaseOptions {
public static final Option<String> DATABASE =
Options.key("database").stringType().noDefaultValue().withDescription("database");
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSourceConfig.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSourceOptions.java
similarity index 96%
rename from
seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSourceConfig.java
rename to
seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSourceOptions.java
index 94b9854838..2fe91dc399 100644
---
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSourceOptions.java
@@ -20,7 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.milvus.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-public class MilvusSourceConfig extends MilvusCommonConfig {
+public class MilvusSourceOptions extends MilvusBaseOptions {
public static final Option<String> DATABASE =
Options.key("database")
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusBufferBatchWriter.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusBufferBatchWriter.java
index 36949075f3..0aeb5b72d6 100644
---
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusBufferBatchWriter.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusBufferBatchWriter.java
@@ -53,14 +53,14 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
-import static
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.BATCH_SIZE;
-import static
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.CREATE_INDEX;
-import static
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.ENABLE_AUTO_ID;
-import static
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.ENABLE_UPSERT;
-import static
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.LOAD_COLLECTION;
-import static
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.RATE_LIMIT;
-import static
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.TOKEN;
-import static
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.URL;
+import static
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkOptions.BATCH_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkOptions.CREATE_INDEX;
+import static
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkOptions.ENABLE_AUTO_ID;
+import static
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkOptions.ENABLE_UPSERT;
+import static
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkOptions.LOAD_COLLECTION;
+import static
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkOptions.RATE_LIMIT;
+import static
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkOptions.TOKEN;
+import static
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkOptions.URL;
@Slf4j
public class MilvusBufferBatchWriter {
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java
index 9167d806df..984fa2c75e 100644
---
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java
@@ -33,7 +33,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import
org.apache.seatunnel.connectors.seatunnel.milvus.catalog.MilvusCatalogFactory;
-import
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkOptions;
import
org.apache.seatunnel.connectors.seatunnel.milvus.state.MilvusAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.milvus.state.MilvusCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.milvus.state.MilvusSinkState;
@@ -90,7 +90,7 @@ public class MilvusSink
@Override
public String getPluginName() {
- return MilvusSinkConfig.CONNECTOR_IDENTITY;
+ return MilvusSinkOptions.CONNECTOR_IDENTITY;
}
@Override
@@ -102,8 +102,8 @@ public class MilvusSink
CatalogFactory catalogFactory = new MilvusCatalogFactory();
Catalog catalog =
catalogFactory.createCatalog(catalogTable.getCatalogName(), config);
- SchemaSaveMode schemaSaveMode =
config.get(MilvusSinkConfig.SCHEMA_SAVE_MODE);
- DataSaveMode dataSaveMode =
config.get(MilvusSinkConfig.DATA_SAVE_MODE);
+ SchemaSaveMode schemaSaveMode =
config.get(MilvusSinkOptions.SCHEMA_SAVE_MODE);
+ DataSaveMode dataSaveMode =
config.get(MilvusSinkOptions.DATA_SAVE_MODE);
return Optional.of(
new DefaultSaveModeHandler(
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java
index 6ea5b5a2ff..ab4d2d4b92 100644
---
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java
@@ -25,7 +25,7 @@ import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
-import
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkOptions;
import org.apache.commons.lang3.StringUtils;
@@ -42,13 +42,13 @@ public class MilvusSinkFactory implements TableSinkFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(MilvusSinkConfig.URL, MilvusSinkConfig.TOKEN)
+ .required(MilvusSinkOptions.URL, MilvusSinkOptions.TOKEN)
.optional(
- MilvusSinkConfig.ENABLE_UPSERT,
- MilvusSinkConfig.ENABLE_DYNAMIC_FIELD,
- MilvusSinkConfig.ENABLE_AUTO_ID,
- MilvusSinkConfig.SCHEMA_SAVE_MODE,
- MilvusSinkConfig.DATA_SAVE_MODE)
+ MilvusSinkOptions.ENABLE_UPSERT,
+ MilvusSinkOptions.ENABLE_DYNAMIC_FIELD,
+ MilvusSinkOptions.ENABLE_AUTO_ID,
+ MilvusSinkOptions.SCHEMA_SAVE_MODE,
+ MilvusSinkOptions.DATA_SAVE_MODE)
.build();
}
@@ -62,8 +62,8 @@ public class MilvusSinkFactory implements TableSinkFactory {
ReadonlyConfig config, CatalogTable sourceCatalogTable) {
TableIdentifier sourceTableId = sourceCatalogTable.getTableId();
String databaseName;
- if (StringUtils.isNotEmpty(config.get(MilvusSinkConfig.DATABASE))) {
- databaseName = config.get(MilvusSinkConfig.DATABASE);
+ if (StringUtils.isNotEmpty(config.get(MilvusSinkOptions.DATABASE))) {
+ databaseName = config.get(MilvusSinkOptions.DATABASE);
} else {
databaseName = sourceTableId.getDatabaseName();
}
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSource.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSource.java
index 17112d0b75..60e9c7689b 100644
---
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSource.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSource.java
@@ -27,7 +27,7 @@ import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceOptions;
import
org.apache.seatunnel.connectors.seatunnel.milvus.utils.MilvusConvertUtils;
import java.util.ArrayList;
@@ -79,6 +79,6 @@ public class MilvusSource
@Override
public String getPluginName() {
- return MilvusSourceConfig.CONNECTOR_IDENTITY;
+ return MilvusSourceOptions.CONNECTOR_IDENTITY;
}
}
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceFactory.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceFactory.java
index d511026a85..48adafdf88 100644
---
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceFactory.java
@@ -24,7 +24,7 @@ import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
-import
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceOptions;
import com.google.auto.service.AutoService;
import lombok.extern.slf4j.Slf4j;
@@ -44,8 +44,8 @@ public class MilvusSourceFactory implements
TableSourceFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(MilvusSourceConfig.URL, MilvusSourceConfig.TOKEN)
- .optional(MilvusSourceConfig.DATABASE,
MilvusSourceConfig.COLLECTION)
+ .required(MilvusSourceOptions.URL, MilvusSourceOptions.TOKEN)
+ .optional(MilvusSourceOptions.DATABASE,
MilvusSourceOptions.COLLECTION)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java
index 316aa51488..6cbc8f5ee3 100644
---
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java
@@ -26,7 +26,7 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.CommonOptions;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceOptions;
import
org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.milvus.utils.source.MilvusSourceConverter;
@@ -59,8 +59,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedDeque;
-import static
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceConfig.BATCH_SIZE;
-import static
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceConfig.RATE_LIMIT;
+import static
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceOptions.BATCH_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceOptions.RATE_LIMIT;
@Slf4j
public class MilvusSourceReader implements SourceReader<SeaTunnelRow,
MilvusSourceSplit> {
@@ -88,8 +88,8 @@ public class MilvusSourceReader implements
SourceReader<SeaTunnelRow, MilvusSour
client =
new MilvusServiceClient(
ConnectParam.newBuilder()
- .withUri(config.get(MilvusSourceConfig.URL))
-
.withToken(config.get(MilvusSourceConfig.TOKEN))
+ .withUri(config.get(MilvusSourceOptions.URL))
+
.withToken(config.get(MilvusSourceOptions.TOKEN))
.build());
setRateLimit(config.get(RATE_LIMIT).toString());
}
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumerator.java
index e415b0cfff..1308a73bef 100644
---
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumerator.java
@@ -22,7 +22,7 @@ import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceOptions;
import
org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException;
@@ -81,8 +81,8 @@ public class MilvusSourceSplitEnumerator
public void open() {
ConnectParam connectParam =
ConnectParam.newBuilder()
- .withUri(config.get(MilvusSourceConfig.URL))
- .withToken(config.get(MilvusSourceConfig.TOKEN))
+ .withUri(config.get(MilvusSourceOptions.URL))
+ .withToken(config.get(MilvusSourceOptions.TOKEN))
.build();
this.client = new MilvusServiceClient(connectParam);
}
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConvertUtils.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConvertUtils.java
index 8c8d9b616a..ae9b7ccde3 100644
---
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConvertUtils.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/MilvusConvertUtils.java
@@ -29,7 +29,7 @@ import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.VectorIndex;
import org.apache.seatunnel.api.table.type.CommonOptions;
import org.apache.seatunnel.connectors.seatunnel.milvus.catalog.MilvusOptions;
-import
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceOptions;
import
org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.milvus.utils.source.MilvusSourceConverter;
@@ -77,14 +77,14 @@ public class MilvusConvertUtils {
MilvusServiceClient client =
new MilvusServiceClient(
ConnectParam.newBuilder()
- .withUri(config.get(MilvusSourceConfig.URL))
-
.withToken(config.get(MilvusSourceConfig.TOKEN))
+ .withUri(config.get(MilvusSourceOptions.URL))
+
.withToken(config.get(MilvusSourceOptions.TOKEN))
.build());
- String database = config.get(MilvusSourceConfig.DATABASE);
+ String database = config.get(MilvusSourceOptions.DATABASE);
List<String> collectionList = new ArrayList<>();
- if (StringUtils.isNotEmpty(config.get(MilvusSourceConfig.COLLECTION)))
{
- collectionList.add(config.get(MilvusSourceConfig.COLLECTION));
+ if
(StringUtils.isNotEmpty(config.get(MilvusSourceOptions.COLLECTION))) {
+ collectionList.add(config.get(MilvusSourceOptions.COLLECTION));
} else {
R<ShowCollectionsResponse> response =
client.showCollections(
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/sink/MilvusSinkConverter.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/sink/MilvusSinkConverter.java
index 0ca373468c..10e15b50f1 100644
---
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/sink/MilvusSinkConverter.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/sink/MilvusSinkConverter.java
@@ -49,8 +49,8 @@ import java.util.List;
import java.util.stream.Collectors;
import static
org.apache.seatunnel.api.table.catalog.PrimaryKey.isPrimaryKeyField;
-import static
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.ENABLE_AUTO_ID;
-import static
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.ENABLE_DYNAMIC_FIELD;
+import static
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkOptions.ENABLE_AUTO_ID;
+import static
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkOptions.ENABLE_DYNAMIC_FIELD;
public class MilvusSinkConverter {
private static final Gson gson = new Gson();
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
index 1e04e9266a..8d780035e5 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
@@ -32,7 +32,7 @@ import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.VectorType;
import org.apache.seatunnel.common.utils.BufferUtils;
import org.apache.seatunnel.connectors.seatunnel.milvus.catalog.MilvusCatalog;
-import
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkOptions;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.EngineType;
@@ -128,8 +128,8 @@ public class MilvusIT extends TestSuiteBase implements
TestResource {
throws SQLException, ClassNotFoundException,
InstantiationException,
IllegalAccessException {
Map<String, Object> config = new HashMap<>();
- config.put(MilvusSinkConfig.URL.key(), this.container.getEndpoint());
- config.put(MilvusSinkConfig.TOKEN.key(), TOKEN);
+ config.put(MilvusSinkOptions.URL.key(), this.container.getEndpoint());
+ config.put(MilvusSinkOptions.TOKEN.key(), TOKEN);
ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(config);
catalog = new MilvusCatalog(COLLECTION_NAME, readonlyConfig);
catalog.open();