This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new fdacbae1af [Improve] maxcompute options (#9163)
fdacbae1af is described below
commit fdacbae1af73f9866d2f42191ccc320db52b446e
Author: Jarvis <[email protected]>
AuthorDate: Mon Apr 14 14:18:47 2025 +0800
[Improve] maxcompute options (#9163)
Co-authored-by: Jia Fan <[email protected]>
---
.../seatunnel/api/ConnectorOptionCheckTest.java | 2 -
.../maxcompute/catalog/MaxComputeCatalog.java | 32 ++++++-----
.../catalog/MaxComputeCatalogFactory.java | 24 ++++----
.../catalog/MaxComputeDataTypeConvertor.java | 8 +--
...mputeConfig.java => MaxcomputeBaseOptions.java} | 66 ++++------------------
...mputeConfig.java => MaxcomputeSinkOptions.java} | 60 +-------------------
.../maxcompute/config/MaxcomputeSourceOptions.java | 32 +++++++++++
.../datatype/MaxComputeTypeConverter.java | 13 +++--
.../maxcompute/sink/MaxComputeSaveModeHandler.java | 15 +++--
.../seatunnel/maxcompute/sink/MaxcomputeSink.java | 16 +++---
.../maxcompute/sink/MaxcomputeSinkFactory.java | 39 ++++++-------
.../maxcompute/sink/MaxcomputeWriter.java | 17 +++---
.../maxcompute/source/MaxcomputeSource.java | 65 ++++++++++++---------
.../maxcompute/source/MaxcomputeSourceFactory.java | 29 ++++------
.../source/MaxcomputeSourceSplitEnumerator.java | 5 +-
.../seatunnel/maxcompute/util/MaxcomputeUtil.java | 30 +++++-----
.../catalog/MaxComputeDataTypeConvertorTest.java | 4 +-
17 files changed, 191 insertions(+), 266 deletions(-)
diff --git
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index 3bac19ff2f..24566951d3 100644
---
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -195,7 +195,6 @@ public class ConnectorOptionCheckTest {
whiteList.add("PulsarSinkOptions");
whiteList.add("SlsSinkOptions");
whiteList.add("Neo4jSinkOptions");
- whiteList.add("MaxcomputeSinkOptions");
whiteList.add("TDengineSourceOptions");
whiteList.add("PulsarSourceOptions");
whiteList.add("MongodbSinkOptions");
@@ -203,7 +202,6 @@ public class ConnectorOptionCheckTest {
whiteList.add("SentrySinkOptions");
whiteList.add("QdrantSinkOptions");
whiteList.add("RocketMqSinkOptions");
- whiteList.add("MaxcomputeSourceOptions");
whiteList.add("SocketSinkOptions");
whiteList.add("SelectDBSinkOptions");
whiteList.add("RocketMqSourceOptions");
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java
index d0e0503188..12eb8b4667 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java
@@ -33,6 +33,8 @@ import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistExceptio
import
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeBaseOptions;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeSinkOptions;
import
org.apache.seatunnel.connectors.seatunnel.maxcompute.datatype.MaxComputeTypeConverter;
import
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil;
@@ -55,12 +57,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Optional;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SAVE_MODE_CREATE_TEMPLATE;
import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
@Slf4j
@@ -78,7 +74,10 @@ public class MaxComputeCatalog implements Catalog {
@Override
public void open() throws CatalogException {
- account = new AliyunAccount(readonlyConfig.get(ACCESS_ID),
readonlyConfig.get(ACCESS_KEY));
+ account =
+ new AliyunAccount(
+ readonlyConfig.get(MaxcomputeBaseOptions.ACCESS_ID),
+ readonlyConfig.get(MaxcomputeBaseOptions.ACCESS_KEY));
}
@Override
@@ -91,13 +90,13 @@ public class MaxComputeCatalog implements Catalog {
@Override
public String getDefaultDatabase() throws CatalogException {
- return readonlyConfig.get(PROJECT);
+ return readonlyConfig.get(MaxcomputeBaseOptions.PROJECT);
}
@Override
public boolean databaseExists(String databaseName) throws CatalogException
{
try {
- Odps odps = getOdps(readonlyConfig.get(PROJECT));
+ Odps odps =
getOdps(readonlyConfig.get(MaxcomputeBaseOptions.PROJECT));
Projects projects = odps.projects();
return projects.exists(databaseName);
} catch (OdpsException e) {
@@ -109,7 +108,7 @@ public class MaxComputeCatalog implements Catalog {
public List<String> listDatabases() throws CatalogException {
try {
// todo: how to get all projects
- String project = readonlyConfig.get(PROJECT);
+ String project = readonlyConfig.get(MaxcomputeBaseOptions.PROJECT);
if (databaseExists(project)) {
return Lists.newArrayList(project);
}
@@ -210,7 +209,8 @@ public class MaxComputeCatalog implements Catalog {
SQLTask.run(
odps,
MaxComputeCatalogUtil.getCreateTableStatement(
-
readonlyConfig.get(SAVE_MODE_CREATE_TEMPLATE),
+ readonlyConfig.get(
+
MaxcomputeSinkOptions.SAVE_MODE_CREATE_TEMPLATE),
tablePath,
table))
.waitForSuccess();
@@ -250,8 +250,10 @@ public class MaxComputeCatalog implements Catalog {
Odps odps = getOdps(tablePath.getDatabaseName());
Table odpsTable = odps.tables().get(tablePath.getTableName());
if (odpsTable.isPartitioned()
- &&
StringUtils.isNotEmpty(readonlyConfig.get(PARTITION_SPEC))) {
- PartitionSpec partitionSpec = new
PartitionSpec(readonlyConfig.get(PARTITION_SPEC));
+ && StringUtils.isNotEmpty(
+
readonlyConfig.get(MaxcomputeBaseOptions.PARTITION_SPEC))) {
+ PartitionSpec partitionSpec =
+ new
PartitionSpec(readonlyConfig.get(MaxcomputeBaseOptions.PARTITION_SPEC));
odpsTable.deletePartition(partitionSpec, ignoreIfNotExists);
odpsTable.createPartition(partitionSpec, true);
} else {
@@ -313,7 +315,7 @@ public class MaxComputeCatalog implements Catalog {
checkArgument(catalogTable.isPresent(), "CatalogTable cannot be
null");
return new SQLPreviewResult(
MaxComputeCatalogUtil.getCreateTableStatement(
- readonlyConfig.get(SAVE_MODE_CREATE_TEMPLATE),
+
readonlyConfig.get(MaxcomputeSinkOptions.SAVE_MODE_CREATE_TEMPLATE),
tablePath,
catalogTable.get()));
} else if (actionType == ActionType.DROP_TABLE) {
@@ -325,7 +327,7 @@ public class MaxComputeCatalog implements Catalog {
private Odps getOdps(String project) {
Odps odps = new Odps(account);
- odps.setEndpoint(readonlyConfig.get(ENDPOINT));
+ odps.setEndpoint(readonlyConfig.get(MaxcomputeBaseOptions.ENDPOINT));
odps.setDefaultProject(project);
return odps;
}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogFactory.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogFactory.java
index b170b79d48..b8f6187617 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogFactory.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogFactory.java
@@ -23,18 +23,10 @@ import
org.apache.seatunnel.api.options.ConnectorCommonOptions;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.Factory;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeBaseOptions;
import com.google.auto.service.AutoService;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SPLIT_ROW;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;
-
@AutoService(Factory.class)
public class MaxComputeCatalogFactory implements CatalogFactory {
@@ -45,14 +37,22 @@ public class MaxComputeCatalogFactory implements
CatalogFactory {
@Override
public String factoryIdentifier() {
- return PLUGIN_NAME;
+ return MaxcomputeBaseOptions.PLUGIN_NAME;
}
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(ACCESS_ID, ACCESS_KEY, ENDPOINT, PROJECT, TABLE_NAME)
- .optional(PARTITION_SPEC, SPLIT_ROW,
ConnectorCommonOptions.SCHEMA)
+ .required(
+ MaxcomputeBaseOptions.ACCESS_ID,
+ MaxcomputeBaseOptions.ACCESS_KEY,
+ MaxcomputeBaseOptions.ENDPOINT,
+ MaxcomputeBaseOptions.PROJECT,
+ MaxcomputeBaseOptions.TABLE_NAME)
+ .optional(
+ MaxcomputeBaseOptions.PARTITION_SPEC,
+ MaxcomputeBaseOptions.SPLIT_ROW,
+ ConnectorCommonOptions.SCHEMA)
.build();
}
}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java
index 17886b87c3..e6d4271703 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java
@@ -29,7 +29,7 @@ import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
-import
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeBaseOptions;
import
org.apache.seatunnel.connectors.seatunnel.maxcompute.datatype.MaxComputeTypeConverter;
import org.apache.commons.collections4.MapUtils;
@@ -82,7 +82,7 @@ public class MaxComputeDataTypeConvertor implements
DataTypeConvertor<TypeInfo>
return ArrayType.DOUBLE_ARRAY_TYPE;
default:
throw CommonError.convertToSeaTunnelTypeError(
- MaxcomputeConfig.PLUGIN_NAME, connectorDataType,
field);
+ MaxcomputeBaseOptions.PLUGIN_NAME,
connectorDataType, field);
}
}
if (connectorDataType.startsWith("STRUCT")) {
@@ -141,7 +141,7 @@ public class MaxComputeDataTypeConvertor implements
DataTypeConvertor<TypeInfo>
return BasicType.VOID_TYPE;
default:
throw CommonError.convertToSeaTunnelTypeError(
- MaxcomputeConfig.PLUGIN_NAME, connectorDataType,
field);
+ MaxcomputeBaseOptions.PLUGIN_NAME, connectorDataType,
field);
}
}
@@ -183,6 +183,6 @@ public class MaxComputeDataTypeConvertor implements
DataTypeConvertor<TypeInfo>
@Override
public String getIdentity() {
- return MaxcomputeConfig.PLUGIN_NAME;
+ return MaxcomputeBaseOptions.PLUGIN_NAME;
}
}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeBaseOptions.java
similarity index 58%
copy from
seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java
copy to
seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeBaseOptions.java
index a4515380f7..f4abe04810 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeBaseOptions.java
@@ -21,19 +21,15 @@ import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.api.sink.DataSaveMode;
-import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
-import org.apache.seatunnel.api.sink.SchemaSaveMode;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
-public class MaxcomputeConfig implements Serializable {
+public class MaxcomputeBaseOptions implements Serializable {
public static final String PLUGIN_NAME = "Maxcompute";
- private static final int SPLIT_ROW_DEFAULT = 10000;
public static final Option<String> ACCESS_ID =
Options.key("accessId")
.stringType()
@@ -51,72 +47,34 @@ public class MaxcomputeConfig implements Serializable {
.stringType()
.noDefaultValue()
.withDescription("Your Maxcompute endpoint start with
http");
+
public static final Option<String> PROJECT =
Options.key("project")
.stringType()
.noDefaultValue()
.withDescription("Your Maxcompute project which is created
in Alibaba Cloud");
+
+ public static final Option<List<Map<String, Object>>> TABLE_LIST =
+ Options.key("table_list")
+ .type(new TypeReference<List<Map<String, Object>>>() {})
+ .noDefaultValue()
+ .withDescription("List of tables to be written to
MaxCompute.");
+
public static final Option<String> TABLE_NAME =
Options.key("table_name")
.stringType()
.noDefaultValue()
.withDescription("Target Maxcompute table name eg: fake");
+
public static final Option<String> PARTITION_SPEC =
Options.key("partition_spec")
.stringType()
.noDefaultValue()
.withDescription("This spec of Maxcompute partition
table.");
+
public static final Option<Integer> SPLIT_ROW =
Options.key("split_row")
.intType()
- .defaultValue(SPLIT_ROW_DEFAULT)
+ .defaultValue(10000)
.withDescription("Number of rows per split. default:
10000");
- public static final Option<Boolean> OVERWRITE =
- Options.key("overwrite")
- .booleanType()
- .defaultValue(false)
- .withDescription("Whether to overwrite the table or
partition");
-
- public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
- Options.key("schema_save_mode")
- .enumType(SchemaSaveMode.class)
- .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
- .withDescription("schema_save_mode");
-
- public static final Option<DataSaveMode> DATA_SAVE_MODE =
- Options.key("data_save_mode")
- .enumType(DataSaveMode.class)
- .defaultValue(DataSaveMode.APPEND_DATA)
- .withDescription("data_save_mode");
-
- public static final Option<String> CUSTOM_SQL =
-
Options.key("custom_sql").stringType().noDefaultValue().withDescription("custom_sql");
-
- public static final Option<List<String>> READ_COLUMNS =
- Options.key("read_columns")
- .listType()
- .noDefaultValue()
- .withDescription("The read columns of the table");
-
- public static final Option<List<Map<String, Object>>> TABLE_LIST =
- Options.key("table_list")
- .type(new TypeReference<List<Map<String, Object>>>() {})
- .noDefaultValue()
- .withDescription("List of tables to be written to
MaxCompute.");
-
- // create table
- public static final Option<String> SAVE_MODE_CREATE_TEMPLATE =
- Options.key("save_mode_create_template")
- .stringType()
- .defaultValue(
- "CREATE TABLE IF NOT EXISTS `"
- +
SaveModePlaceHolder.TABLE.getPlaceHolder()
- + "` (\n"
- +
SaveModePlaceHolder.ROWTYPE_FIELDS.getPlaceHolder()
- + "\n"
- + ") COMMENT '"
- +
SaveModePlaceHolder.COMMENT.getPlaceHolder()
- + "' ;")
- .withDescription(
- "Create table statement template, used to create
MaxCompute table");
}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeSinkOptions.java
similarity index 52%
rename from
seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java
rename to
seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeSinkOptions.java
index a4515380f7..8a6da00f00 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeSinkOptions.java
@@ -17,60 +17,14 @@
package org.apache.seatunnel.connectors.seatunnel.maxcompute.config;
-import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
-
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-public class MaxcomputeConfig implements Serializable {
+public class MaxcomputeSinkOptions extends MaxcomputeBaseOptions {
- public static final String PLUGIN_NAME = "Maxcompute";
-
- private static final int SPLIT_ROW_DEFAULT = 10000;
- public static final Option<String> ACCESS_ID =
- Options.key("accessId")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "Your Maxcompute accessId which cloud be access
from Alibaba Cloud");
- public static final Option<String> ACCESS_KEY =
- Options.key("accesskey")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "Your Maxcompute accessKey which cloud be access
from Alibaba Cloud");
- public static final Option<String> ENDPOINT =
- Options.key("endpoint")
- .stringType()
- .noDefaultValue()
- .withDescription("Your Maxcompute endpoint start with
http");
- public static final Option<String> PROJECT =
- Options.key("project")
- .stringType()
- .noDefaultValue()
- .withDescription("Your Maxcompute project which is created
in Alibaba Cloud");
- public static final Option<String> TABLE_NAME =
- Options.key("table_name")
- .stringType()
- .noDefaultValue()
- .withDescription("Target Maxcompute table name eg: fake");
- public static final Option<String> PARTITION_SPEC =
- Options.key("partition_spec")
- .stringType()
- .noDefaultValue()
- .withDescription("This spec of Maxcompute partition
table.");
- public static final Option<Integer> SPLIT_ROW =
- Options.key("split_row")
- .intType()
- .defaultValue(SPLIT_ROW_DEFAULT)
- .withDescription("Number of rows per split. default:
10000");
public static final Option<Boolean> OVERWRITE =
Options.key("overwrite")
.booleanType()
@@ -92,18 +46,6 @@ public class MaxcomputeConfig implements Serializable {
public static final Option<String> CUSTOM_SQL =
Options.key("custom_sql").stringType().noDefaultValue().withDescription("custom_sql");
- public static final Option<List<String>> READ_COLUMNS =
- Options.key("read_columns")
- .listType()
- .noDefaultValue()
- .withDescription("The read columns of the table");
-
- public static final Option<List<Map<String, Object>>> TABLE_LIST =
- Options.key("table_list")
- .type(new TypeReference<List<Map<String, Object>>>() {})
- .noDefaultValue()
- .withDescription("List of tables to be written to
MaxCompute.");
-
// create table
public static final Option<String> SAVE_MODE_CREATE_TEMPLATE =
Options.key("save_mode_create_template")
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeSourceOptions.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeSourceOptions.java
new file mode 100644
index 0000000000..452223d566
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeSourceOptions.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.util.List;
+
+public class MaxcomputeSourceOptions extends MaxcomputeBaseOptions {
+
+ public static final Option<List<String>> READ_COLUMNS =
+ Options.key("read_columns")
+ .listType()
+ .noDefaultValue()
+ .withDescription("The read columns of the table");
+}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/datatype/MaxComputeTypeConverter.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/datatype/MaxComputeTypeConverter.java
index 0c81dacc2a..ed950d4d3c 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/datatype/MaxComputeTypeConverter.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/datatype/MaxComputeTypeConverter.java
@@ -31,6 +31,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.connectors.seatunnel.common.source.TypeDefineUtils;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeBaseOptions;
import com.aliyun.odps.OdpsType;
import com.aliyun.odps.type.AbstractCharTypeInfo;
@@ -47,8 +48,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME;
-
/** Refer
https://help.aliyun.com/zh/maxcompute/user-guide/maxcompute-v2-0-data-type-edition
*/
@Slf4j
@AutoService(TypeConverter.class)
@@ -104,7 +103,7 @@ public class MaxComputeTypeConverter implements
TypeConverter<BasicTypeDefine<Ty
@Override
public String identifier() {
- return PLUGIN_NAME;
+ return MaxcomputeBaseOptions.PLUGIN_NAME;
}
@Override
@@ -161,7 +160,7 @@ public class MaxComputeTypeConverter implements
TypeConverter<BasicTypeDefine<Ty
break;
default:
throw CommonError.unsupportedDataType(
- PLUGIN_NAME,
+ MaxcomputeBaseOptions.PLUGIN_NAME,
arrayColumn.getDataType().getSqlType().toString(),
typeDefine.getName());
}
@@ -306,7 +305,7 @@ public class MaxComputeTypeConverter implements
TypeConverter<BasicTypeDefine<Ty
case INTERVAL:
default:
throw CommonError.convertToSeaTunnelTypeError(
- PLUGIN_NAME, dataType, typeDefine.getName());
+ MaxcomputeBaseOptions.PLUGIN_NAME, dataType,
typeDefine.getName());
}
}
return builder.build();
@@ -534,7 +533,9 @@ public class MaxComputeTypeConverter implements
TypeConverter<BasicTypeDefine<Ty
case TIME:
default:
throw CommonError.convertToConnectorTypeError(
- PLUGIN_NAME, column.getDataType().getSqlType().name(),
column.getName());
+ MaxcomputeBaseOptions.PLUGIN_NAME,
+ column.getDataType().getSqlType().name(),
+ column.getName());
}
return builder.build();
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxComputeSaveModeHandler.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxComputeSaveModeHandler.java
index e8fa3832b9..4156bc697b 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxComputeSaveModeHandler.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxComputeSaveModeHandler.java
@@ -24,13 +24,12 @@ import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import
org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog.MaxComputeCatalog;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeSinkOptions;
import org.apache.commons.lang3.StringUtils;
import com.aliyun.odps.PartitionSpec;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
-
public class MaxComputeSaveModeHandler extends DefaultSaveModeHandler {
private final ReadonlyConfig readonlyConfig;
@@ -49,20 +48,24 @@ public class MaxComputeSaveModeHandler extends
DefaultSaveModeHandler {
@Override
protected void createSchemaWhenNotExist() {
super.createSchemaWhenNotExist();
- if (StringUtils.isNotEmpty(readonlyConfig.get(PARTITION_SPEC))) {
+ if
(StringUtils.isNotEmpty(readonlyConfig.get(MaxcomputeSinkOptions.PARTITION_SPEC)))
{
((MaxComputeCatalog) catalog)
.createPartition(
- tablePath, new
PartitionSpec(readonlyConfig.get(PARTITION_SPEC)));
+ tablePath,
+ new PartitionSpec(
+
readonlyConfig.get(MaxcomputeSinkOptions.PARTITION_SPEC)));
}
}
@Override
protected void recreateSchema() {
super.recreateSchema();
- if (StringUtils.isNotEmpty(readonlyConfig.get(PARTITION_SPEC))) {
+ if
(StringUtils.isNotEmpty(readonlyConfig.get(MaxcomputeSinkOptions.PARTITION_SPEC)))
{
((MaxComputeCatalog) catalog)
.createPartition(
- tablePath, new
PartitionSpec(readonlyConfig.get(PARTITION_SPEC)));
+ tablePath,
+ new PartitionSpec(
+
readonlyConfig.get(MaxcomputeSinkOptions.PARTITION_SPEC)));
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
index de3d90f6a7..dcf5ba567a 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
@@ -30,7 +30,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.constants.PluginType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import
org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog.MaxComputeCatalog;
-import
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeSinkOptions;
import
org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
import org.slf4j.Logger;
@@ -39,8 +39,6 @@ import org.slf4j.LoggerFactory;
import java.util.Optional;
import static
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.OVERWRITE;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME;
public class MaxcomputeSink extends AbstractSimpleSink<SeaTunnelRow, Void>
implements SupportSaveMode, SupportMultiTableSink {
@@ -55,7 +53,7 @@ public class MaxcomputeSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
@Override
public String getPluginName() {
- return PLUGIN_NAME;
+ return MaxcomputeSinkOptions.PLUGIN_NAME;
}
@Override
@@ -69,7 +67,7 @@ public class MaxcomputeSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
discoverFactory(
Thread.currentThread().getContextClassLoader(),
CatalogFactory.class,
- "MaxCompute");
+ MaxcomputeSinkOptions.PLUGIN_NAME);
if (catalogFactory == null) {
throw new MaxcomputeConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
@@ -84,8 +82,8 @@ public class MaxcomputeSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
catalogFactory.createCatalog(
catalogFactory.factoryIdentifier(),
readonlyConfig);
- DataSaveMode dataSaveMode =
readonlyConfig.get(MaxcomputeConfig.DATA_SAVE_MODE);
- if (readonlyConfig.get(OVERWRITE)) {
+ DataSaveMode dataSaveMode =
readonlyConfig.get(MaxcomputeSinkOptions.DATA_SAVE_MODE);
+ if (readonlyConfig.get(MaxcomputeSinkOptions.OVERWRITE)) {
// compatible with old version
LOG.warn(
"The configuration of 'overwrite' is deprecated, please
use 'data_save_mode' instead.");
@@ -94,11 +92,11 @@ public class MaxcomputeSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
return Optional.of(
new MaxComputeSaveModeHandler(
- readonlyConfig.get(MaxcomputeConfig.SCHEMA_SAVE_MODE),
+
readonlyConfig.get(MaxcomputeSinkOptions.SCHEMA_SAVE_MODE),
dataSaveMode,
catalog,
catalogTable,
- readonlyConfig.get(MaxcomputeConfig.CUSTOM_SQL),
+ readonlyConfig.get(MaxcomputeSinkOptions.CUSTOM_SQL),
readonlyConfig));
}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
index 2fffe55f20..a89d4bc27b 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
@@ -25,40 +25,33 @@ import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeSinkOptions;
import com.google.auto.service.AutoService;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.CUSTOM_SQL;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.DATA_SAVE_MODE;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.OVERWRITE;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SAVE_MODE_CREATE_TEMPLATE;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SCHEMA_SAVE_MODE;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;
-
@AutoService(Factory.class)
public class MaxcomputeSinkFactory implements TableSinkFactory {
@Override
public String factoryIdentifier() {
- return PLUGIN_NAME;
+ return MaxcomputeSinkOptions.PLUGIN_NAME;
}
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(ACCESS_ID, ACCESS_KEY, ENDPOINT, PROJECT, TABLE_NAME)
+ .required(
+ MaxcomputeSinkOptions.ACCESS_ID,
+ MaxcomputeSinkOptions.ACCESS_KEY,
+ MaxcomputeSinkOptions.ENDPOINT,
+ MaxcomputeSinkOptions.PROJECT,
+ MaxcomputeSinkOptions.TABLE_NAME)
.optional(
- PARTITION_SPEC,
- OVERWRITE,
- SCHEMA_SAVE_MODE,
- DATA_SAVE_MODE,
- SAVE_MODE_CREATE_TEMPLATE,
- CUSTOM_SQL,
+ MaxcomputeSinkOptions.PARTITION_SPEC,
+ MaxcomputeSinkOptions.OVERWRITE,
+ MaxcomputeSinkOptions.SCHEMA_SAVE_MODE,
+ MaxcomputeSinkOptions.DATA_SAVE_MODE,
+ MaxcomputeSinkOptions.SAVE_MODE_CREATE_TEMPLATE,
+ MaxcomputeSinkOptions.CUSTOM_SQL,
SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}
@@ -71,8 +64,8 @@ public class MaxcomputeSinkFactory implements
TableSinkFactory {
CatalogTable.of(
TableIdentifier.of(
context.getCatalogTable().getCatalogName(),
- context.getOptions().get(PROJECT),
- context.getOptions().get(TABLE_NAME)),
+
context.getOptions().get(MaxcomputeSinkOptions.PROJECT),
+
context.getOptions().get(MaxcomputeSinkOptions.TABLE_NAME)),
context.getCatalogTable()));
}
}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
index 9de521a493..423ead236f 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeSinkOptions;
import
org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeTypeMapper;
import
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil;
@@ -37,10 +38,6 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;
-
@Slf4j
public class MaxcomputeWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
implements SupportMultiTableSinkWriter<Void> {
@@ -55,17 +52,19 @@ public class MaxcomputeWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
Table table = MaxcomputeUtil.getTable(readonlyConfig);
this.tableSchema = table.getSchema();
TableTunnel tunnel = MaxcomputeUtil.getTableTunnel(readonlyConfig);
- if (readonlyConfig.getOptional(PARTITION_SPEC).isPresent()) {
- PartitionSpec partitionSpec = new
PartitionSpec(readonlyConfig.get(PARTITION_SPEC));
+ if
(readonlyConfig.getOptional(MaxcomputeSinkOptions.PARTITION_SPEC).isPresent()) {
+ PartitionSpec partitionSpec =
+ new
PartitionSpec(readonlyConfig.get(MaxcomputeSinkOptions.PARTITION_SPEC));
session =
tunnel.createUploadSession(
- readonlyConfig.get(PROJECT),
- readonlyConfig.get(TABLE_NAME),
+
readonlyConfig.get(MaxcomputeSinkOptions.PROJECT),
+
readonlyConfig.get(MaxcomputeSinkOptions.TABLE_NAME),
partitionSpec);
} else {
session =
tunnel.createUploadSession(
- readonlyConfig.get(PROJECT),
readonlyConfig.get(TABLE_NAME));
+
readonlyConfig.get(MaxcomputeSinkOptions.PROJECT),
+
readonlyConfig.get(MaxcomputeSinkOptions.TABLE_NAME));
}
this.recordWriter = session.openBufferedWriter();
log.info("open record writer success");
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
index 8b6a8d9676..5657e443b6 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
@@ -31,6 +31,7 @@ import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import
org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog.MaxComputeCatalog;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeSourceOptions;
import lombok.extern.slf4j.Slf4j;
@@ -39,14 +40,6 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.READ_COLUMNS;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SPLIT_ROW;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_LIST;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;
-
@Slf4j
public class MaxcomputeSource
implements SeaTunnelSource<SeaTunnelRow, MaxcomputeSourceSplit,
MaxcomputeSourceState>,
@@ -62,7 +55,7 @@ public class MaxcomputeSource
@Override
public String getPluginName() {
- return PLUGIN_NAME;
+ return MaxcomputeSourceOptions.PLUGIN_NAME;
}
private Map<TablePath, SourceTableInfo> getSourceTableInfos(ReadonlyConfig
readonlyConfig) {
@@ -74,31 +67,38 @@ public class MaxcomputeSource
CatalogTable.of(
TableIdentifier.of(
"maxcompute",
- readonlyConfig.get(PROJECT),
- readonlyConfig.get(TABLE_NAME)),
+
readonlyConfig.get(MaxcomputeSourceOptions.PROJECT),
+
readonlyConfig.get(MaxcomputeSourceOptions.TABLE_NAME)),
catalogTable);
tables.put(
catalogTable.getTablePath(),
new SourceTableInfo(
catalogTable,
- readonlyConfig.get(PARTITION_SPEC),
- readonlyConfig.get(SPLIT_ROW)));
+
readonlyConfig.get(MaxcomputeSourceOptions.PARTITION_SPEC),
+
readonlyConfig.get(MaxcomputeSourceOptions.SPLIT_ROW)));
} else {
try (MaxComputeCatalog catalog = new
MaxComputeCatalog("maxcompute", readonlyConfig)) {
catalog.open();
- if (readonlyConfig.getOptional(TABLE_LIST).isPresent()) {
- for (Map<String, Object> subConfig :
readonlyConfig.get(TABLE_LIST)) {
+ if
(readonlyConfig.getOptional(MaxcomputeSourceOptions.TABLE_LIST).isPresent()) {
+ for (Map<String, Object> subConfig :
+
readonlyConfig.get(MaxcomputeSourceOptions.TABLE_LIST)) {
ReadonlyConfig subReadonlyConfig =
ReadonlyConfig.fromMap(subConfig);
String project =
subReadonlyConfig
- .getOptional(PROJECT)
- .orElse(readonlyConfig.get(PROJECT));
+
.getOptional(MaxcomputeSourceOptions.PROJECT)
+ .orElse(
+ readonlyConfig.get(
+
MaxcomputeSourceOptions.PROJECT));
TablePath tablePath =
- TablePath.of(project,
subReadonlyConfig.get(TABLE_NAME));
+ TablePath.of(
+ project,
+
subReadonlyConfig.get(MaxcomputeSourceOptions.TABLE_NAME));
String partitionSpec =
subReadonlyConfig
- .getOptional(PARTITION_SPEC)
-
.orElse(readonlyConfig.get(PARTITION_SPEC));
+
.getOptional(MaxcomputeSourceOptions.PARTITION_SPEC)
+ .orElse(
+ readonlyConfig.get(
+
MaxcomputeSourceOptions.PARTITION_SPEC));
if (subReadonlyConfig
.getOptional(ConnectorCommonOptions.SCHEMA)
@@ -114,17 +114,22 @@ public class MaxcomputeSource
new SourceTableInfo(
catalogTable,
partitionSpec,
- subReadonlyConfig.get(SPLIT_ROW)));
+ subReadonlyConfig.get(
+
MaxcomputeSourceOptions.SPLIT_ROW)));
} else {
Integer splitRow =
subReadonlyConfig
- .getOptional(SPLIT_ROW)
-
.orElse(readonlyConfig.get(SPLIT_ROW));
+
.getOptional(MaxcomputeSourceOptions.SPLIT_ROW)
+ .orElse(
+ readonlyConfig.get(
+
MaxcomputeSourceOptions.SPLIT_ROW));
tables.put(
tablePath,
new SourceTableInfo(
catalog.getTable(
- tablePath,
subReadonlyConfig.get(READ_COLUMNS)),
+ tablePath,
+ subReadonlyConfig.get(
+
MaxcomputeSourceOptions.READ_COLUMNS)),
partitionSpec,
splitRow));
}
@@ -132,13 +137,17 @@ public class MaxcomputeSource
} else {
TablePath tablePath =
TablePath.of(
- readonlyConfig.get(PROJECT),
readonlyConfig.get(TABLE_NAME));
+
readonlyConfig.get(MaxcomputeSourceOptions.PROJECT),
+
readonlyConfig.get(MaxcomputeSourceOptions.TABLE_NAME));
tables.put(
tablePath,
new SourceTableInfo(
- catalog.getTable(tablePath,
readonlyConfig.get(READ_COLUMNS)),
- readonlyConfig.get(PARTITION_SPEC),
- readonlyConfig.get(SPLIT_ROW)));
+ catalog.getTable(
+ tablePath,
+ readonlyConfig.get(
+
MaxcomputeSourceOptions.READ_COLUMNS)),
+
readonlyConfig.get(MaxcomputeSourceOptions.PARTITION_SPEC),
+
readonlyConfig.get(MaxcomputeSourceOptions.SPLIT_ROW)));
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java
index 7887e686fa..1662abd231 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java
@@ -25,40 +25,33 @@ import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeSourceOptions;
import com.google.auto.service.AutoService;
import java.io.Serializable;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.READ_COLUMNS;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SPLIT_ROW;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_LIST;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;
-
@AutoService(Factory.class)
public class MaxcomputeSourceFactory implements TableSourceFactory {
@Override
public String factoryIdentifier() {
- return PLUGIN_NAME;
+ return MaxcomputeSourceOptions.PLUGIN_NAME;
}
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(ACCESS_ID, ACCESS_KEY, ENDPOINT)
+ .required(
+ MaxcomputeSourceOptions.ACCESS_ID,
+ MaxcomputeSourceOptions.ACCESS_KEY,
+ MaxcomputeSourceOptions.ENDPOINT)
.optional(
- PARTITION_SPEC,
- SPLIT_ROW,
+ MaxcomputeSourceOptions.PARTITION_SPEC,
+ MaxcomputeSourceOptions.SPLIT_ROW,
ConnectorCommonOptions.SCHEMA,
- PROJECT,
- READ_COLUMNS)
- .exclusive(TABLE_LIST, TABLE_NAME)
+ MaxcomputeSourceOptions.PROJECT,
+ MaxcomputeSourceOptions.READ_COLUMNS)
+ .exclusive(MaxcomputeSourceOptions.TABLE_LIST,
MaxcomputeSourceOptions.TABLE_NAME)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java
index ec2a009e4b..f44f27cb0c 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java
@@ -20,6 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.maxcompute.source;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.catalog.TablePath;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeSourceOptions;
import
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil;
import com.aliyun.odps.tunnel.TableTunnel;
@@ -35,8 +36,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SPLIT_ROW;
-
@Slf4j
public class MaxcomputeSourceSplitEnumerator
implements SourceSplitEnumerator<MaxcomputeSourceSplit,
MaxcomputeSourceState> {
@@ -114,7 +113,7 @@ public class MaxcomputeSourceSplitEnumerator
sourceTableInfo.getPartitionSpec());
long recordCount = session.getRecordCount();
int splitRowNum = (int) Math.ceil((double) recordCount /
numReaders);
- int splitRow = SPLIT_ROW.defaultValue();
+ int splitRow = MaxcomputeSourceOptions.SPLIT_ROW.defaultValue();
if (sourceTableInfo.getSplitRow() != null &&
sourceTableInfo.getSplitRow() > 0) {
splitRow = sourceTableInfo.getSplitRow();
}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeUtil.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeUtil.java
index 349b7b73d0..325d052b57 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeUtil.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeUtil.java
@@ -20,6 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.maxcompute.util;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeBaseOptions;
import
org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
import org.apache.commons.lang3.StringUtils;
@@ -32,18 +33,11 @@ import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.tunnel.TableTunnel;
import lombok.extern.slf4j.Slf4j;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
-import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;
-
@Slf4j
public class MaxcomputeUtil {
public static Table getTable(ReadonlyConfig readonlyConfig) {
Odps odps = getOdps(readonlyConfig);
- return odps.tables().get(readonlyConfig.get(TABLE_NAME));
+ return
odps.tables().get(readonlyConfig.get(MaxcomputeBaseOptions.TABLE_NAME));
}
public static TableTunnel getTableTunnel(ReadonlyConfig readonlyConfig) {
@@ -53,10 +47,12 @@ public class MaxcomputeUtil {
public static Odps getOdps(ReadonlyConfig readonlyConfig) {
Account account =
- new AliyunAccount(readonlyConfig.get(ACCESS_ID),
readonlyConfig.get(ACCESS_KEY));
+ new AliyunAccount(
+ readonlyConfig.get(MaxcomputeBaseOptions.ACCESS_ID),
+ readonlyConfig.get(MaxcomputeBaseOptions.ACCESS_KEY));
Odps odps = new Odps(account);
- odps.setEndpoint(readonlyConfig.get(ENDPOINT));
- odps.setDefaultProject(readonlyConfig.get(PROJECT));
+ odps.setEndpoint(readonlyConfig.get(MaxcomputeBaseOptions.ENDPOINT));
+
odps.setDefaultProject(readonlyConfig.get(MaxcomputeBaseOptions.PROJECT));
return odps;
}
@@ -64,17 +60,19 @@ public class MaxcomputeUtil {
TableTunnel tunnel = getTableTunnel(readonlyConfig);
TableTunnel.DownloadSession session;
try {
- if (readonlyConfig.getOptional(PARTITION_SPEC).isPresent()) {
- PartitionSpec partitionSpec = new
PartitionSpec(readonlyConfig.get(PARTITION_SPEC));
+ if
(readonlyConfig.getOptional(MaxcomputeBaseOptions.PARTITION_SPEC).isPresent()) {
+ PartitionSpec partitionSpec =
+ new
PartitionSpec(readonlyConfig.get(MaxcomputeBaseOptions.PARTITION_SPEC));
session =
tunnel.createDownloadSession(
- readonlyConfig.get(PROJECT),
- readonlyConfig.get(TABLE_NAME),
+
readonlyConfig.get(MaxcomputeBaseOptions.PROJECT),
+
readonlyConfig.get(MaxcomputeBaseOptions.TABLE_NAME),
partitionSpec);
} else {
session =
tunnel.createDownloadSession(
- readonlyConfig.get(PROJECT),
readonlyConfig.get(TABLE_NAME));
+
readonlyConfig.get(MaxcomputeBaseOptions.PROJECT),
+
readonlyConfig.get(MaxcomputeBaseOptions.TABLE_NAME));
}
} catch (Exception e) {
throw new MaxcomputeConnectorException(
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertorTest.java
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertorTest.java
index b0b05cf214..92c56fd1c7 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertorTest.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertorTest.java
@@ -23,7 +23,7 @@ import org.apache.seatunnel.api.table.type.MultipleRowType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
-import
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeBaseOptions;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -73,7 +73,7 @@ public class MaxComputeDataTypeConvertorTest {
@Test
public void getIdentity() {
Assertions.assertEquals(
- MaxcomputeConfig.PLUGIN_NAME,
maxComputeDataTypeConvertor.getIdentity());
+ MaxcomputeBaseOptions.PLUGIN_NAME,
maxComputeDataTypeConvertor.getIdentity());
}
@Test