This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new fdacbae1af [Improve] maxcompute options (#9163)
fdacbae1af is described below

commit fdacbae1af73f9866d2f42191ccc320db52b446e
Author: Jarvis <[email protected]>
AuthorDate: Mon Apr 14 14:18:47 2025 +0800

    [Improve] maxcompute options (#9163)
    
    Co-authored-by: Jia Fan <[email protected]>
---
 .../seatunnel/api/ConnectorOptionCheckTest.java    |  2 -
 .../maxcompute/catalog/MaxComputeCatalog.java      | 32 ++++++-----
 .../catalog/MaxComputeCatalogFactory.java          | 24 ++++----
 .../catalog/MaxComputeDataTypeConvertor.java       |  8 +--
 ...mputeConfig.java => MaxcomputeBaseOptions.java} | 66 ++++------------------
 ...mputeConfig.java => MaxcomputeSinkOptions.java} | 60 +-------------------
 .../maxcompute/config/MaxcomputeSourceOptions.java | 32 +++++++++++
 .../datatype/MaxComputeTypeConverter.java          | 13 +++--
 .../maxcompute/sink/MaxComputeSaveModeHandler.java | 15 +++--
 .../seatunnel/maxcompute/sink/MaxcomputeSink.java  | 16 +++---
 .../maxcompute/sink/MaxcomputeSinkFactory.java     | 39 ++++++-------
 .../maxcompute/sink/MaxcomputeWriter.java          | 17 +++---
 .../maxcompute/source/MaxcomputeSource.java        | 65 ++++++++++++---------
 .../maxcompute/source/MaxcomputeSourceFactory.java | 29 ++++------
 .../source/MaxcomputeSourceSplitEnumerator.java    |  5 +-
 .../seatunnel/maxcompute/util/MaxcomputeUtil.java  | 30 +++++-----
 .../catalog/MaxComputeDataTypeConvertorTest.java   |  4 +-
 17 files changed, 191 insertions(+), 266 deletions(-)

diff --git 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index 3bac19ff2f..24566951d3 100644
--- 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++ 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -195,7 +195,6 @@ public class ConnectorOptionCheckTest {
         whiteList.add("PulsarSinkOptions");
         whiteList.add("SlsSinkOptions");
         whiteList.add("Neo4jSinkOptions");
-        whiteList.add("MaxcomputeSinkOptions");
         whiteList.add("TDengineSourceOptions");
         whiteList.add("PulsarSourceOptions");
         whiteList.add("MongodbSinkOptions");
@@ -203,7 +202,6 @@ public class ConnectorOptionCheckTest {
         whiteList.add("SentrySinkOptions");
         whiteList.add("QdrantSinkOptions");
         whiteList.add("RocketMqSinkOptions");
-        whiteList.add("MaxcomputeSourceOptions");
         whiteList.add("SocketSinkOptions");
         whiteList.add("SelectDBSinkOptions");
         whiteList.add("RocketMqSourceOptions");
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java
index d0e0503188..12eb8b4667 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java
@@ -33,6 +33,8 @@ import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistExceptio
 import 
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
 import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
 import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeBaseOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeSinkOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.datatype.MaxComputeTypeConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil;
 
@@ -55,12 +57,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SAVE_MODE_CREATE_TEMPLATE;
 import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
 
 @Slf4j
@@ -78,7 +74,10 @@ public class MaxComputeCatalog implements Catalog {
 
     @Override
     public void open() throws CatalogException {
-        account = new AliyunAccount(readonlyConfig.get(ACCESS_ID), 
readonlyConfig.get(ACCESS_KEY));
+        account =
+                new AliyunAccount(
+                        readonlyConfig.get(MaxcomputeBaseOptions.ACCESS_ID),
+                        readonlyConfig.get(MaxcomputeBaseOptions.ACCESS_KEY));
     }
 
     @Override
@@ -91,13 +90,13 @@ public class MaxComputeCatalog implements Catalog {
 
     @Override
     public String getDefaultDatabase() throws CatalogException {
-        return readonlyConfig.get(PROJECT);
+        return readonlyConfig.get(MaxcomputeBaseOptions.PROJECT);
     }
 
     @Override
     public boolean databaseExists(String databaseName) throws CatalogException 
{
         try {
-            Odps odps = getOdps(readonlyConfig.get(PROJECT));
+            Odps odps = 
getOdps(readonlyConfig.get(MaxcomputeBaseOptions.PROJECT));
             Projects projects = odps.projects();
             return projects.exists(databaseName);
         } catch (OdpsException e) {
@@ -109,7 +108,7 @@ public class MaxComputeCatalog implements Catalog {
     public List<String> listDatabases() throws CatalogException {
         try {
             // todo: how to get all projects
-            String project = readonlyConfig.get(PROJECT);
+            String project = readonlyConfig.get(MaxcomputeBaseOptions.PROJECT);
             if (databaseExists(project)) {
                 return Lists.newArrayList(project);
             }
@@ -210,7 +209,8 @@ public class MaxComputeCatalog implements Catalog {
             SQLTask.run(
                             odps,
                             MaxComputeCatalogUtil.getCreateTableStatement(
-                                    
readonlyConfig.get(SAVE_MODE_CREATE_TEMPLATE),
+                                    readonlyConfig.get(
+                                            
MaxcomputeSinkOptions.SAVE_MODE_CREATE_TEMPLATE),
                                     tablePath,
                                     table))
                     .waitForSuccess();
@@ -250,8 +250,10 @@ public class MaxComputeCatalog implements Catalog {
             Odps odps = getOdps(tablePath.getDatabaseName());
             Table odpsTable = odps.tables().get(tablePath.getTableName());
             if (odpsTable.isPartitioned()
-                    && 
StringUtils.isNotEmpty(readonlyConfig.get(PARTITION_SPEC))) {
-                PartitionSpec partitionSpec = new 
PartitionSpec(readonlyConfig.get(PARTITION_SPEC));
+                    && StringUtils.isNotEmpty(
+                            
readonlyConfig.get(MaxcomputeBaseOptions.PARTITION_SPEC))) {
+                PartitionSpec partitionSpec =
+                        new 
PartitionSpec(readonlyConfig.get(MaxcomputeBaseOptions.PARTITION_SPEC));
                 odpsTable.deletePartition(partitionSpec, ignoreIfNotExists);
                 odpsTable.createPartition(partitionSpec, true);
             } else {
@@ -313,7 +315,7 @@ public class MaxComputeCatalog implements Catalog {
             checkArgument(catalogTable.isPresent(), "CatalogTable cannot be 
null");
             return new SQLPreviewResult(
                     MaxComputeCatalogUtil.getCreateTableStatement(
-                            readonlyConfig.get(SAVE_MODE_CREATE_TEMPLATE),
+                            
readonlyConfig.get(MaxcomputeSinkOptions.SAVE_MODE_CREATE_TEMPLATE),
                             tablePath,
                             catalogTable.get()));
         } else if (actionType == ActionType.DROP_TABLE) {
@@ -325,7 +327,7 @@ public class MaxComputeCatalog implements Catalog {
 
     private Odps getOdps(String project) {
         Odps odps = new Odps(account);
-        odps.setEndpoint(readonlyConfig.get(ENDPOINT));
+        odps.setEndpoint(readonlyConfig.get(MaxcomputeBaseOptions.ENDPOINT));
         odps.setDefaultProject(project);
         return odps;
     }
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogFactory.java
index b170b79d48..b8f6187617 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogFactory.java
@@ -23,18 +23,10 @@ import 
org.apache.seatunnel.api.options.ConnectorCommonOptions;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.factory.CatalogFactory;
 import org.apache.seatunnel.api.table.factory.Factory;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeBaseOptions;
 
 import com.google.auto.service.AutoService;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SPLIT_ROW;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;
-
 @AutoService(Factory.class)
 public class MaxComputeCatalogFactory implements CatalogFactory {
 
@@ -45,14 +37,22 @@ public class MaxComputeCatalogFactory implements 
CatalogFactory {
 
     @Override
     public String factoryIdentifier() {
-        return PLUGIN_NAME;
+        return MaxcomputeBaseOptions.PLUGIN_NAME;
     }
 
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(ACCESS_ID, ACCESS_KEY, ENDPOINT, PROJECT, TABLE_NAME)
-                .optional(PARTITION_SPEC, SPLIT_ROW, 
ConnectorCommonOptions.SCHEMA)
+                .required(
+                        MaxcomputeBaseOptions.ACCESS_ID,
+                        MaxcomputeBaseOptions.ACCESS_KEY,
+                        MaxcomputeBaseOptions.ENDPOINT,
+                        MaxcomputeBaseOptions.PROJECT,
+                        MaxcomputeBaseOptions.TABLE_NAME)
+                .optional(
+                        MaxcomputeBaseOptions.PARTITION_SPEC,
+                        MaxcomputeBaseOptions.SPLIT_ROW,
+                        ConnectorCommonOptions.SCHEMA)
                 .build();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java
index 17886b87c3..e6d4271703 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java
@@ -29,7 +29,7 @@ import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonError;
-import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeBaseOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.datatype.MaxComputeTypeConverter;
 
 import org.apache.commons.collections4.MapUtils;
@@ -82,7 +82,7 @@ public class MaxComputeDataTypeConvertor implements 
DataTypeConvertor<TypeInfo>
                     return ArrayType.DOUBLE_ARRAY_TYPE;
                 default:
                     throw CommonError.convertToSeaTunnelTypeError(
-                            MaxcomputeConfig.PLUGIN_NAME, connectorDataType, 
field);
+                            MaxcomputeBaseOptions.PLUGIN_NAME, 
connectorDataType, field);
             }
         }
         if (connectorDataType.startsWith("STRUCT")) {
@@ -141,7 +141,7 @@ public class MaxComputeDataTypeConvertor implements 
DataTypeConvertor<TypeInfo>
                 return BasicType.VOID_TYPE;
             default:
                 throw CommonError.convertToSeaTunnelTypeError(
-                        MaxcomputeConfig.PLUGIN_NAME, connectorDataType, 
field);
+                        MaxcomputeBaseOptions.PLUGIN_NAME, connectorDataType, 
field);
         }
     }
 
@@ -183,6 +183,6 @@ public class MaxComputeDataTypeConvertor implements 
DataTypeConvertor<TypeInfo>
 
     @Override
     public String getIdentity() {
-        return MaxcomputeConfig.PLUGIN_NAME;
+        return MaxcomputeBaseOptions.PLUGIN_NAME;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeBaseOptions.java
similarity index 58%
copy from 
seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java
copy to 
seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeBaseOptions.java
index a4515380f7..f4abe04810 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeBaseOptions.java
@@ -21,19 +21,15 @@ import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
 
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.api.sink.DataSaveMode;
-import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
-import org.apache.seatunnel.api.sink.SchemaSaveMode;
 
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 
-public class MaxcomputeConfig implements Serializable {
+public class MaxcomputeBaseOptions implements Serializable {
 
     public static final String PLUGIN_NAME = "Maxcompute";
 
-    private static final int SPLIT_ROW_DEFAULT = 10000;
     public static final Option<String> ACCESS_ID =
             Options.key("accessId")
                     .stringType()
@@ -51,72 +47,34 @@ public class MaxcomputeConfig implements Serializable {
                     .stringType()
                     .noDefaultValue()
                     .withDescription("Your Maxcompute endpoint start with 
http");
+
     public static final Option<String> PROJECT =
             Options.key("project")
                     .stringType()
                     .noDefaultValue()
                     .withDescription("Your Maxcompute project which is created 
in Alibaba Cloud");
+
+    public static final Option<List<Map<String, Object>>> TABLE_LIST =
+            Options.key("table_list")
+                    .type(new TypeReference<List<Map<String, Object>>>() {})
+                    .noDefaultValue()
+                    .withDescription("List of tables to be written to 
MaxCompute.");
+
     public static final Option<String> TABLE_NAME =
             Options.key("table_name")
                     .stringType()
                     .noDefaultValue()
                     .withDescription("Target Maxcompute table name eg: fake");
+
     public static final Option<String> PARTITION_SPEC =
             Options.key("partition_spec")
                     .stringType()
                     .noDefaultValue()
                     .withDescription("This spec of Maxcompute partition 
table.");
+
     public static final Option<Integer> SPLIT_ROW =
             Options.key("split_row")
                     .intType()
-                    .defaultValue(SPLIT_ROW_DEFAULT)
+                    .defaultValue(10000)
                     .withDescription("Number of rows per split. default: 
10000");
-    public static final Option<Boolean> OVERWRITE =
-            Options.key("overwrite")
-                    .booleanType()
-                    .defaultValue(false)
-                    .withDescription("Whether to overwrite the table or 
partition");
-
-    public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
-            Options.key("schema_save_mode")
-                    .enumType(SchemaSaveMode.class)
-                    .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
-                    .withDescription("schema_save_mode");
-
-    public static final Option<DataSaveMode> DATA_SAVE_MODE =
-            Options.key("data_save_mode")
-                    .enumType(DataSaveMode.class)
-                    .defaultValue(DataSaveMode.APPEND_DATA)
-                    .withDescription("data_save_mode");
-
-    public static final Option<String> CUSTOM_SQL =
-            
Options.key("custom_sql").stringType().noDefaultValue().withDescription("custom_sql");
-
-    public static final Option<List<String>> READ_COLUMNS =
-            Options.key("read_columns")
-                    .listType()
-                    .noDefaultValue()
-                    .withDescription("The read columns of the table");
-
-    public static final Option<List<Map<String, Object>>> TABLE_LIST =
-            Options.key("table_list")
-                    .type(new TypeReference<List<Map<String, Object>>>() {})
-                    .noDefaultValue()
-                    .withDescription("List of tables to be written to 
MaxCompute.");
-
-    // create table
-    public static final Option<String> SAVE_MODE_CREATE_TEMPLATE =
-            Options.key("save_mode_create_template")
-                    .stringType()
-                    .defaultValue(
-                            "CREATE TABLE IF NOT EXISTS `"
-                                    + 
SaveModePlaceHolder.TABLE.getPlaceHolder()
-                                    + "` (\n"
-                                    + 
SaveModePlaceHolder.ROWTYPE_FIELDS.getPlaceHolder()
-                                    + "\n"
-                                    + ") COMMENT '"
-                                    + 
SaveModePlaceHolder.COMMENT.getPlaceHolder()
-                                    + "' ;")
-                    .withDescription(
-                            "Create table statement template, used to create 
MaxCompute table");
 }
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeSinkOptions.java
similarity index 52%
rename from 
seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java
rename to 
seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeSinkOptions.java
index a4515380f7..8a6da00f00 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeSinkOptions.java
@@ -17,60 +17,14 @@
 
 package org.apache.seatunnel.connectors.seatunnel.maxcompute.config;
 
-import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
-
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
 import org.apache.seatunnel.api.sink.SchemaSaveMode;
 
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-public class MaxcomputeConfig implements Serializable {
+public class MaxcomputeSinkOptions extends MaxcomputeBaseOptions {
 
-    public static final String PLUGIN_NAME = "Maxcompute";
-
-    private static final int SPLIT_ROW_DEFAULT = 10000;
-    public static final Option<String> ACCESS_ID =
-            Options.key("accessId")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "Your Maxcompute accessId which cloud be access 
from Alibaba Cloud");
-    public static final Option<String> ACCESS_KEY =
-            Options.key("accesskey")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "Your Maxcompute accessKey which cloud be access 
from Alibaba Cloud");
-    public static final Option<String> ENDPOINT =
-            Options.key("endpoint")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("Your Maxcompute endpoint start with 
http");
-    public static final Option<String> PROJECT =
-            Options.key("project")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("Your Maxcompute project which is created 
in Alibaba Cloud");
-    public static final Option<String> TABLE_NAME =
-            Options.key("table_name")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("Target Maxcompute table name eg: fake");
-    public static final Option<String> PARTITION_SPEC =
-            Options.key("partition_spec")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("This spec of Maxcompute partition 
table.");
-    public static final Option<Integer> SPLIT_ROW =
-            Options.key("split_row")
-                    .intType()
-                    .defaultValue(SPLIT_ROW_DEFAULT)
-                    .withDescription("Number of rows per split. default: 
10000");
     public static final Option<Boolean> OVERWRITE =
             Options.key("overwrite")
                     .booleanType()
@@ -92,18 +46,6 @@ public class MaxcomputeConfig implements Serializable {
     public static final Option<String> CUSTOM_SQL =
             
Options.key("custom_sql").stringType().noDefaultValue().withDescription("custom_sql");
 
-    public static final Option<List<String>> READ_COLUMNS =
-            Options.key("read_columns")
-                    .listType()
-                    .noDefaultValue()
-                    .withDescription("The read columns of the table");
-
-    public static final Option<List<Map<String, Object>>> TABLE_LIST =
-            Options.key("table_list")
-                    .type(new TypeReference<List<Map<String, Object>>>() {})
-                    .noDefaultValue()
-                    .withDescription("List of tables to be written to 
MaxCompute.");
-
     // create table
     public static final Option<String> SAVE_MODE_CREATE_TEMPLATE =
             Options.key("save_mode_create_template")
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeSourceOptions.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeSourceOptions.java
new file mode 100644
index 0000000000..452223d566
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeSourceOptions.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.util.List;
+
+public class MaxcomputeSourceOptions extends MaxcomputeBaseOptions {
+
+    public static final Option<List<String>> READ_COLUMNS =
+            Options.key("read_columns")
+                    .listType()
+                    .noDefaultValue()
+                    .withDescription("The read columns of the table");
+}
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/datatype/MaxComputeTypeConverter.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/datatype/MaxComputeTypeConverter.java
index 0c81dacc2a..ed950d4d3c 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/datatype/MaxComputeTypeConverter.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/datatype/MaxComputeTypeConverter.java
@@ -31,6 +31,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonError;
 import org.apache.seatunnel.connectors.seatunnel.common.source.TypeDefineUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeBaseOptions;
 
 import com.aliyun.odps.OdpsType;
 import com.aliyun.odps.type.AbstractCharTypeInfo;
@@ -47,8 +48,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME;
-
 /** Refer 
https://help.aliyun.com/zh/maxcompute/user-guide/maxcompute-v2-0-data-type-edition
 */
 @Slf4j
 @AutoService(TypeConverter.class)
@@ -104,7 +103,7 @@ public class MaxComputeTypeConverter implements 
TypeConverter<BasicTypeDefine<Ty
 
     @Override
     public String identifier() {
-        return PLUGIN_NAME;
+        return MaxcomputeBaseOptions.PLUGIN_NAME;
     }
 
     @Override
@@ -161,7 +160,7 @@ public class MaxComputeTypeConverter implements 
TypeConverter<BasicTypeDefine<Ty
                     break;
                 default:
                     throw CommonError.unsupportedDataType(
-                            PLUGIN_NAME,
+                            MaxcomputeBaseOptions.PLUGIN_NAME,
                             arrayColumn.getDataType().getSqlType().toString(),
                             typeDefine.getName());
             }
@@ -306,7 +305,7 @@ public class MaxComputeTypeConverter implements 
TypeConverter<BasicTypeDefine<Ty
                 case INTERVAL:
                 default:
                     throw CommonError.convertToSeaTunnelTypeError(
-                            PLUGIN_NAME, dataType, typeDefine.getName());
+                            MaxcomputeBaseOptions.PLUGIN_NAME, dataType, 
typeDefine.getName());
             }
         }
         return builder.build();
@@ -534,7 +533,9 @@ public class MaxComputeTypeConverter implements 
TypeConverter<BasicTypeDefine<Ty
             case TIME:
             default:
                 throw CommonError.convertToConnectorTypeError(
-                        PLUGIN_NAME, column.getDataType().getSqlType().name(), 
column.getName());
+                        MaxcomputeBaseOptions.PLUGIN_NAME,
+                        column.getDataType().getSqlType().name(),
+                        column.getName());
         }
 
         return builder.build();
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxComputeSaveModeHandler.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxComputeSaveModeHandler.java
index e8fa3832b9..4156bc697b 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxComputeSaveModeHandler.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxComputeSaveModeHandler.java
@@ -24,13 +24,12 @@ import org.apache.seatunnel.api.sink.SchemaSaveMode;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog.MaxComputeCatalog;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeSinkOptions;
 
 import org.apache.commons.lang3.StringUtils;
 
 import com.aliyun.odps.PartitionSpec;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
-
 public class MaxComputeSaveModeHandler extends DefaultSaveModeHandler {
 
     private final ReadonlyConfig readonlyConfig;
@@ -49,20 +48,24 @@ public class MaxComputeSaveModeHandler extends 
DefaultSaveModeHandler {
     @Override
     protected void createSchemaWhenNotExist() {
         super.createSchemaWhenNotExist();
-        if (StringUtils.isNotEmpty(readonlyConfig.get(PARTITION_SPEC))) {
+        if 
(StringUtils.isNotEmpty(readonlyConfig.get(MaxcomputeSinkOptions.PARTITION_SPEC)))
 {
             ((MaxComputeCatalog) catalog)
                     .createPartition(
-                            tablePath, new 
PartitionSpec(readonlyConfig.get(PARTITION_SPEC)));
+                            tablePath,
+                            new PartitionSpec(
+                                    
readonlyConfig.get(MaxcomputeSinkOptions.PARTITION_SPEC)));
         }
     }
 
     @Override
     protected void recreateSchema() {
         super.recreateSchema();
-        if (StringUtils.isNotEmpty(readonlyConfig.get(PARTITION_SPEC))) {
+        if 
(StringUtils.isNotEmpty(readonlyConfig.get(MaxcomputeSinkOptions.PARTITION_SPEC)))
 {
             ((MaxComputeCatalog) catalog)
                     .createPartition(
-                            tablePath, new 
PartitionSpec(readonlyConfig.get(PARTITION_SPEC)));
+                            tablePath,
+                            new PartitionSpec(
+                                    
readonlyConfig.get(MaxcomputeSinkOptions.PARTITION_SPEC)));
         }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
index de3d90f6a7..dcf5ba567a 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
@@ -30,7 +30,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.constants.PluginType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
 import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog.MaxComputeCatalog;
-import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeSinkOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
 
 import org.slf4j.Logger;
@@ -39,8 +39,6 @@ import org.slf4j.LoggerFactory;
 import java.util.Optional;
 
 import static 
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.OVERWRITE;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME;
 
 public class MaxcomputeSink extends AbstractSimpleSink<SeaTunnelRow, Void>
         implements SupportSaveMode, SupportMultiTableSink {
@@ -55,7 +53,7 @@ public class MaxcomputeSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
 
     @Override
     public String getPluginName() {
-        return PLUGIN_NAME;
+        return MaxcomputeSinkOptions.PLUGIN_NAME;
     }
 
     @Override
@@ -69,7 +67,7 @@ public class MaxcomputeSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
                 discoverFactory(
                         Thread.currentThread().getContextClassLoader(),
                         CatalogFactory.class,
-                        "MaxCompute");
+                        MaxcomputeSinkOptions.PLUGIN_NAME);
         if (catalogFactory == null) {
             throw new MaxcomputeConnectorException(
                     SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
@@ -84,8 +82,8 @@ public class MaxcomputeSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
                         catalogFactory.createCatalog(
                                 catalogFactory.factoryIdentifier(), 
readonlyConfig);
 
-        DataSaveMode dataSaveMode = 
readonlyConfig.get(MaxcomputeConfig.DATA_SAVE_MODE);
-        if (readonlyConfig.get(OVERWRITE)) {
+        DataSaveMode dataSaveMode = 
readonlyConfig.get(MaxcomputeSinkOptions.DATA_SAVE_MODE);
+        if (readonlyConfig.get(MaxcomputeSinkOptions.OVERWRITE)) {
             // compatible with old version
             LOG.warn(
                     "The configuration of 'overwrite' is deprecated, please 
use 'data_save_mode' instead.");
@@ -94,11 +92,11 @@ public class MaxcomputeSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
 
         return Optional.of(
                 new MaxComputeSaveModeHandler(
-                        readonlyConfig.get(MaxcomputeConfig.SCHEMA_SAVE_MODE),
+                        
readonlyConfig.get(MaxcomputeSinkOptions.SCHEMA_SAVE_MODE),
                         dataSaveMode,
                         catalog,
                         catalogTable,
-                        readonlyConfig.get(MaxcomputeConfig.CUSTOM_SQL),
+                        readonlyConfig.get(MaxcomputeSinkOptions.CUSTOM_SQL),
                         readonlyConfig));
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
index 2fffe55f20..a89d4bc27b 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
@@ -25,40 +25,33 @@ import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeSinkOptions;
 
 import com.google.auto.service.AutoService;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.CUSTOM_SQL;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.DATA_SAVE_MODE;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.OVERWRITE;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SAVE_MODE_CREATE_TEMPLATE;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SCHEMA_SAVE_MODE;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;
-
 @AutoService(Factory.class)
 public class MaxcomputeSinkFactory implements TableSinkFactory {
     @Override
     public String factoryIdentifier() {
-        return PLUGIN_NAME;
+        return MaxcomputeSinkOptions.PLUGIN_NAME;
     }
 
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(ACCESS_ID, ACCESS_KEY, ENDPOINT, PROJECT, TABLE_NAME)
+                .required(
+                        MaxcomputeSinkOptions.ACCESS_ID,
+                        MaxcomputeSinkOptions.ACCESS_KEY,
+                        MaxcomputeSinkOptions.ENDPOINT,
+                        MaxcomputeSinkOptions.PROJECT,
+                        MaxcomputeSinkOptions.TABLE_NAME)
                 .optional(
-                        PARTITION_SPEC,
-                        OVERWRITE,
-                        SCHEMA_SAVE_MODE,
-                        DATA_SAVE_MODE,
-                        SAVE_MODE_CREATE_TEMPLATE,
-                        CUSTOM_SQL,
+                        MaxcomputeSinkOptions.PARTITION_SPEC,
+                        MaxcomputeSinkOptions.OVERWRITE,
+                        MaxcomputeSinkOptions.SCHEMA_SAVE_MODE,
+                        MaxcomputeSinkOptions.DATA_SAVE_MODE,
+                        MaxcomputeSinkOptions.SAVE_MODE_CREATE_TEMPLATE,
+                        MaxcomputeSinkOptions.CUSTOM_SQL,
                         SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
                 .build();
     }
@@ -71,8 +64,8 @@ public class MaxcomputeSinkFactory implements 
TableSinkFactory {
                         CatalogTable.of(
                                 TableIdentifier.of(
                                         
context.getCatalogTable().getCatalogName(),
-                                        context.getOptions().get(PROJECT),
-                                        context.getOptions().get(TABLE_NAME)),
+                                        
context.getOptions().get(MaxcomputeSinkOptions.PROJECT),
+                                        
context.getOptions().get(MaxcomputeSinkOptions.TABLE_NAME)),
                                 context.getCatalogTable()));
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
index 9de521a493..423ead236f 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeSinkOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeTypeMapper;
 import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil;
@@ -37,10 +38,6 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;
-
 @Slf4j
 public class MaxcomputeWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
         implements SupportMultiTableSinkWriter<Void> {
@@ -55,17 +52,19 @@ public class MaxcomputeWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
             Table table = MaxcomputeUtil.getTable(readonlyConfig);
             this.tableSchema = table.getSchema();
             TableTunnel tunnel = MaxcomputeUtil.getTableTunnel(readonlyConfig);
-            if (readonlyConfig.getOptional(PARTITION_SPEC).isPresent()) {
-                PartitionSpec partitionSpec = new 
PartitionSpec(readonlyConfig.get(PARTITION_SPEC));
+            if 
(readonlyConfig.getOptional(MaxcomputeSinkOptions.PARTITION_SPEC).isPresent()) {
+                PartitionSpec partitionSpec =
+                        new 
PartitionSpec(readonlyConfig.get(MaxcomputeSinkOptions.PARTITION_SPEC));
                 session =
                         tunnel.createUploadSession(
-                                readonlyConfig.get(PROJECT),
-                                readonlyConfig.get(TABLE_NAME),
+                                
readonlyConfig.get(MaxcomputeSinkOptions.PROJECT),
+                                
readonlyConfig.get(MaxcomputeSinkOptions.TABLE_NAME),
                                 partitionSpec);
             } else {
                 session =
                         tunnel.createUploadSession(
-                                readonlyConfig.get(PROJECT), 
readonlyConfig.get(TABLE_NAME));
+                                
readonlyConfig.get(MaxcomputeSinkOptions.PROJECT),
+                                
readonlyConfig.get(MaxcomputeSinkOptions.TABLE_NAME));
             }
             this.recordWriter = session.openBufferedWriter();
             log.info("open record writer success");
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
index 8b6a8d9676..5657e443b6 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
@@ -31,6 +31,7 @@ import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog.MaxComputeCatalog;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeSourceOptions;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -39,14 +40,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.READ_COLUMNS;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SPLIT_ROW;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_LIST;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;
-
 @Slf4j
 public class MaxcomputeSource
         implements SeaTunnelSource<SeaTunnelRow, MaxcomputeSourceSplit, 
MaxcomputeSourceState>,
@@ -62,7 +55,7 @@ public class MaxcomputeSource
 
     @Override
     public String getPluginName() {
-        return PLUGIN_NAME;
+        return MaxcomputeSourceOptions.PLUGIN_NAME;
     }
 
     private Map<TablePath, SourceTableInfo> getSourceTableInfos(ReadonlyConfig 
readonlyConfig) {
@@ -74,31 +67,38 @@ public class MaxcomputeSource
                     CatalogTable.of(
                             TableIdentifier.of(
                                     "maxcompute",
-                                    readonlyConfig.get(PROJECT),
-                                    readonlyConfig.get(TABLE_NAME)),
+                                    
readonlyConfig.get(MaxcomputeSourceOptions.PROJECT),
+                                    
readonlyConfig.get(MaxcomputeSourceOptions.TABLE_NAME)),
                             catalogTable);
             tables.put(
                     catalogTable.getTablePath(),
                     new SourceTableInfo(
                             catalogTable,
-                            readonlyConfig.get(PARTITION_SPEC),
-                            readonlyConfig.get(SPLIT_ROW)));
+                            
readonlyConfig.get(MaxcomputeSourceOptions.PARTITION_SPEC),
+                            
readonlyConfig.get(MaxcomputeSourceOptions.SPLIT_ROW)));
         } else {
             try (MaxComputeCatalog catalog = new 
MaxComputeCatalog("maxcompute", readonlyConfig)) {
                 catalog.open();
-                if (readonlyConfig.getOptional(TABLE_LIST).isPresent()) {
-                    for (Map<String, Object> subConfig : 
readonlyConfig.get(TABLE_LIST)) {
+                if 
(readonlyConfig.getOptional(MaxcomputeSourceOptions.TABLE_LIST).isPresent()) {
+                    for (Map<String, Object> subConfig :
+                            
readonlyConfig.get(MaxcomputeSourceOptions.TABLE_LIST)) {
                         ReadonlyConfig subReadonlyConfig = 
ReadonlyConfig.fromMap(subConfig);
                         String project =
                                 subReadonlyConfig
-                                        .getOptional(PROJECT)
-                                        .orElse(readonlyConfig.get(PROJECT));
+                                        
.getOptional(MaxcomputeSourceOptions.PROJECT)
+                                        .orElse(
+                                                readonlyConfig.get(
+                                                        
MaxcomputeSourceOptions.PROJECT));
                         TablePath tablePath =
-                                TablePath.of(project, 
subReadonlyConfig.get(TABLE_NAME));
+                                TablePath.of(
+                                        project,
+                                        
subReadonlyConfig.get(MaxcomputeSourceOptions.TABLE_NAME));
                         String partitionSpec =
                                 subReadonlyConfig
-                                        .getOptional(PARTITION_SPEC)
-                                        
.orElse(readonlyConfig.get(PARTITION_SPEC));
+                                        
.getOptional(MaxcomputeSourceOptions.PARTITION_SPEC)
+                                        .orElse(
+                                                readonlyConfig.get(
+                                                        
MaxcomputeSourceOptions.PARTITION_SPEC));
 
                         if (subReadonlyConfig
                                 .getOptional(ConnectorCommonOptions.SCHEMA)
@@ -114,17 +114,22 @@ public class MaxcomputeSource
                                     new SourceTableInfo(
                                             catalogTable,
                                             partitionSpec,
-                                            subReadonlyConfig.get(SPLIT_ROW)));
+                                            subReadonlyConfig.get(
+                                                    
MaxcomputeSourceOptions.SPLIT_ROW)));
                         } else {
                             Integer splitRow =
                                     subReadonlyConfig
-                                            .getOptional(SPLIT_ROW)
-                                            
.orElse(readonlyConfig.get(SPLIT_ROW));
+                                            
.getOptional(MaxcomputeSourceOptions.SPLIT_ROW)
+                                            .orElse(
+                                                    readonlyConfig.get(
+                                                            
MaxcomputeSourceOptions.SPLIT_ROW));
                             tables.put(
                                     tablePath,
                                     new SourceTableInfo(
                                             catalog.getTable(
-                                                    tablePath, 
subReadonlyConfig.get(READ_COLUMNS)),
+                                                    tablePath,
+                                                    subReadonlyConfig.get(
+                                                            
MaxcomputeSourceOptions.READ_COLUMNS)),
                                             partitionSpec,
                                             splitRow));
                         }
@@ -132,13 +137,17 @@ public class MaxcomputeSource
                 } else {
                     TablePath tablePath =
                             TablePath.of(
-                                    readonlyConfig.get(PROJECT), 
readonlyConfig.get(TABLE_NAME));
+                                    
readonlyConfig.get(MaxcomputeSourceOptions.PROJECT),
+                                    
readonlyConfig.get(MaxcomputeSourceOptions.TABLE_NAME));
                     tables.put(
                             tablePath,
                             new SourceTableInfo(
-                                    catalog.getTable(tablePath, 
readonlyConfig.get(READ_COLUMNS)),
-                                    readonlyConfig.get(PARTITION_SPEC),
-                                    readonlyConfig.get(SPLIT_ROW)));
+                                    catalog.getTable(
+                                            tablePath,
+                                            readonlyConfig.get(
+                                                    
MaxcomputeSourceOptions.READ_COLUMNS)),
+                                    
readonlyConfig.get(MaxcomputeSourceOptions.PARTITION_SPEC),
+                                    
readonlyConfig.get(MaxcomputeSourceOptions.SPLIT_ROW)));
                 }
             }
         }
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java
index 7887e686fa..1662abd231 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java
@@ -25,40 +25,33 @@ import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeSourceOptions;
 
 import com.google.auto.service.AutoService;
 
 import java.io.Serializable;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.READ_COLUMNS;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SPLIT_ROW;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_LIST;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;
-
 @AutoService(Factory.class)
 public class MaxcomputeSourceFactory implements TableSourceFactory {
     @Override
     public String factoryIdentifier() {
-        return PLUGIN_NAME;
+        return MaxcomputeSourceOptions.PLUGIN_NAME;
     }
 
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(ACCESS_ID, ACCESS_KEY, ENDPOINT)
+                .required(
+                        MaxcomputeSourceOptions.ACCESS_ID,
+                        MaxcomputeSourceOptions.ACCESS_KEY,
+                        MaxcomputeSourceOptions.ENDPOINT)
                 .optional(
-                        PARTITION_SPEC,
-                        SPLIT_ROW,
+                        MaxcomputeSourceOptions.PARTITION_SPEC,
+                        MaxcomputeSourceOptions.SPLIT_ROW,
                         ConnectorCommonOptions.SCHEMA,
-                        PROJECT,
-                        READ_COLUMNS)
-                .exclusive(TABLE_LIST, TABLE_NAME)
+                        MaxcomputeSourceOptions.PROJECT,
+                        MaxcomputeSourceOptions.READ_COLUMNS)
+                .exclusive(MaxcomputeSourceOptions.TABLE_LIST, 
MaxcomputeSourceOptions.TABLE_NAME)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java
index ec2a009e4b..f44f27cb0c 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java
@@ -20,6 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.maxcompute.source;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeSourceOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil;
 
 import com.aliyun.odps.tunnel.TableTunnel;
@@ -35,8 +36,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SPLIT_ROW;
-
 @Slf4j
 public class MaxcomputeSourceSplitEnumerator
         implements SourceSplitEnumerator<MaxcomputeSourceSplit, 
MaxcomputeSourceState> {
@@ -114,7 +113,7 @@ public class MaxcomputeSourceSplitEnumerator
                             sourceTableInfo.getPartitionSpec());
             long recordCount = session.getRecordCount();
             int splitRowNum = (int) Math.ceil((double) recordCount / 
numReaders);
-            int splitRow = SPLIT_ROW.defaultValue();
+            int splitRow = MaxcomputeSourceOptions.SPLIT_ROW.defaultValue();
             if (sourceTableInfo.getSplitRow() != null && 
sourceTableInfo.getSplitRow() > 0) {
                 splitRow = sourceTableInfo.getSplitRow();
             }
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeUtil.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeUtil.java
index 349b7b73d0..325d052b57 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeUtil.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeUtil.java
@@ -20,6 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.maxcompute.util;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeBaseOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
 
 import org.apache.commons.lang3.StringUtils;
@@ -32,18 +33,11 @@ import com.aliyun.odps.account.AliyunAccount;
 import com.aliyun.odps.tunnel.TableTunnel;
 import lombok.extern.slf4j.Slf4j;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
-import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;
-
 @Slf4j
 public class MaxcomputeUtil {
     public static Table getTable(ReadonlyConfig readonlyConfig) {
         Odps odps = getOdps(readonlyConfig);
-        return odps.tables().get(readonlyConfig.get(TABLE_NAME));
+        return 
odps.tables().get(readonlyConfig.get(MaxcomputeBaseOptions.TABLE_NAME));
     }
 
     public static TableTunnel getTableTunnel(ReadonlyConfig readonlyConfig) {
@@ -53,10 +47,12 @@ public class MaxcomputeUtil {
 
     public static Odps getOdps(ReadonlyConfig readonlyConfig) {
         Account account =
-                new AliyunAccount(readonlyConfig.get(ACCESS_ID), 
readonlyConfig.get(ACCESS_KEY));
+                new AliyunAccount(
+                        readonlyConfig.get(MaxcomputeBaseOptions.ACCESS_ID),
+                        readonlyConfig.get(MaxcomputeBaseOptions.ACCESS_KEY));
         Odps odps = new Odps(account);
-        odps.setEndpoint(readonlyConfig.get(ENDPOINT));
-        odps.setDefaultProject(readonlyConfig.get(PROJECT));
+        odps.setEndpoint(readonlyConfig.get(MaxcomputeBaseOptions.ENDPOINT));
+        
odps.setDefaultProject(readonlyConfig.get(MaxcomputeBaseOptions.PROJECT));
         return odps;
     }
 
@@ -64,17 +60,19 @@ public class MaxcomputeUtil {
         TableTunnel tunnel = getTableTunnel(readonlyConfig);
         TableTunnel.DownloadSession session;
         try {
-            if (readonlyConfig.getOptional(PARTITION_SPEC).isPresent()) {
-                PartitionSpec partitionSpec = new 
PartitionSpec(readonlyConfig.get(PARTITION_SPEC));
+            if 
(readonlyConfig.getOptional(MaxcomputeBaseOptions.PARTITION_SPEC).isPresent()) {
+                PartitionSpec partitionSpec =
+                        new 
PartitionSpec(readonlyConfig.get(MaxcomputeBaseOptions.PARTITION_SPEC));
                 session =
                         tunnel.createDownloadSession(
-                                readonlyConfig.get(PROJECT),
-                                readonlyConfig.get(TABLE_NAME),
+                                
readonlyConfig.get(MaxcomputeBaseOptions.PROJECT),
+                                
readonlyConfig.get(MaxcomputeBaseOptions.TABLE_NAME),
                                 partitionSpec);
             } else {
                 session =
                         tunnel.createDownloadSession(
-                                readonlyConfig.get(PROJECT), 
readonlyConfig.get(TABLE_NAME));
+                                
readonlyConfig.get(MaxcomputeBaseOptions.PROJECT),
+                                
readonlyConfig.get(MaxcomputeBaseOptions.TABLE_NAME));
             }
         } catch (Exception e) {
             throw new MaxcomputeConnectorException(
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertorTest.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertorTest.java
index b0b05cf214..92c56fd1c7 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertorTest.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertorTest.java
@@ -23,7 +23,7 @@ import org.apache.seatunnel.api.table.type.MultipleRowType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
-import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeBaseOptions;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -73,7 +73,7 @@ public class MaxComputeDataTypeConvertorTest {
     @Test
     public void getIdentity() {
         Assertions.assertEquals(
-                MaxcomputeConfig.PLUGIN_NAME, 
maxComputeDataTypeConvertor.getIdentity());
+                MaxcomputeBaseOptions.PLUGIN_NAME, 
maxComputeDataTypeConvertor.getIdentity());
     }
 
     @Test

Reply via email to