This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 80cf8f4e4 [Feature][Connector-V2][Maxcompute] Add Maxcompute source &
sink connector (#3640)
80cf8f4e4 is described below
commit 80cf8f4e4286b41966f17a0889af9bcccfd0dc83
Author: stdnt-xiao <[email protected]>
AuthorDate: Sun Dec 11 17:05:41 2022 +0800
[Feature][Connector-V2][Maxcompute] Add Maxcompute source & sink connector
(#3640)
---
docs/en/connector-v2/sink/Maxcompute.md | 79 ++++++
docs/en/connector-v2/source/Maxcompute.md | 82 ++++++
plugin-mapping.properties | 2 +
.../connector-maxcompute/pom.xml | 61 ++++
.../maxcompute/config/MaxcomputeConfig.java | 59 ++++
.../exception/MaxcomputeConnectorException.java | 36 +++
.../seatunnel/maxcompute/sink/MaxcomputeSink.java | 67 +++++
.../maxcompute/sink/MaxcomputeSinkFactory.java | 48 ++++
.../maxcompute/sink/MaxcomputeWriter.java | 88 ++++++
.../maxcompute/source/MaxcomputeSource.java | 74 +++++
.../maxcompute/source/MaxcomputeSourceFactory.java | 48 ++++
.../maxcompute/source/MaxcomputeSourceReader.java | 107 +++++++
.../maxcompute/source/MaxcomputeSourceSplit.java | 39 +++
.../source/MaxcomputeSourceSplitEnumerator.java | 149 ++++++++++
.../maxcompute/source/MaxcomputeSourceState.java | 33 +++
.../maxcompute/util/MaxcomputeTypeMapper.java | 311 +++++++++++++++++++++
.../seatunnel/maxcompute/util/MaxcomputeUtil.java | 109 ++++++++
.../main/resources/maxcompute_to_maxcompute.conf | 68 +++++
.../src/test/java/BasicTypeToOdpsTypeTest.java | 97 +++++++
.../src/test/java/MaxcomputeSourceFactoryTest.java | 30 ++
seatunnel-connectors-v2/pom.xml | 1 +
seatunnel-dist/pom.xml | 8 +-
22 files changed, 1595 insertions(+), 1 deletion(-)
diff --git a/docs/en/connector-v2/sink/Maxcompute.md
b/docs/en/connector-v2/sink/Maxcompute.md
new file mode 100644
index 000000000..302dca7ae
--- /dev/null
+++ b/docs/en/connector-v2/sink/Maxcompute.md
@@ -0,0 +1,79 @@
+# Maxcompute
+
+> Maxcompute sink connector
+
+## Description
+
+Used to read data from Maxcompute.
+
+## Key features
+
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [schema projection](../../concept/connector-v2-features.md)
+
+## Options
+
+| name | type | required | default value |
+|-------------------|----------|-----------|---------------|
+| accessId | string | yes | - |
+| accesskey | string | yes | - |
+| endpoint | string | yes | - |
+| project | string | yes | - |
+| table_name | string | yes | - |
+| partition_spec | string | no | - |
+| overwrite | boolean | no | false |
+| common-options | string | no | |
+
+### accessId [string]
+
+`accessId` Your Maxcompute accessId which cloud be access from Alibaba Cloud.
+
+### accesskey [string]
+
+`accesskey` Your Maxcompute accessKey which cloud be access from Alibaba Cloud.
+
+### endpoint [string]
+
+`endpoint` Your Maxcompute endpoint start with http.
+
+### project [string]
+
+`project` Your Maxcompute project which is created in Alibaba Cloud.
+
+### table_name [string]
+
+`table_name` Target Maxcompute table name eg: fake.
+
+### partition_spec [string]
+
+`partition_spec` This spec of Maxcompute partition table eg:ds='20220101'.
+
+### overwrite [boolean]
+
+`overwrite` Whether to overwrite the table or partition, default: false.
+
+### common options
+
+Sink plugin common parameters, please refer to [Sink Common
Options](common-options.md) for details.
+
+## Examples
+
+```hocon
+sink {
+ Maxcompute {
+ accessId="<your access id>"
+ accesskey="<your access Key>"
+ endpoint="<http://service.odps.aliyun.com/api>"
+ project="<your project>"
+ table_name="<your table name>"
+ #partition_spec="<your partition spec>"
+ #overwrite = false
+ }
+}
+```
+
+## Changelog
+
+### next version
+
+- [Feature] Add Maxcompute Sink
Connector([3640](https://github.com/apache/incubator-seatunnel/pull/3640))
diff --git a/docs/en/connector-v2/source/Maxcompute.md
b/docs/en/connector-v2/source/Maxcompute.md
new file mode 100644
index 000000000..133d65949
--- /dev/null
+++ b/docs/en/connector-v2/source/Maxcompute.md
@@ -0,0 +1,82 @@
+# Maxcompute
+
+> Maxcompute source connector
+
+## Description
+
+Used to read data from Maxcompute.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [schema projection](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name | type | required | default value |
+|-----------------|--------|-----------|---------------|
+| accessId | string | yes | - |
+| accesskey | string | yes | - |
+| endpoint | string | yes | - |
+| project | string | yes | - |
+| table_name | string | yes | - |
+| partition_spec | string | no | - |
+| split_row | int | no | 10000 |
+| common-options | string | no | |
+
+### accessId [string]
+
+`accessId` Your Maxcompute accessId which cloud be access from Alibaba Cloud.
+
+### accesskey [string]
+
+`accesskey` Your Maxcompute accessKey which cloud be access from Alibaba Cloud.
+
+### endpoint [string]
+
+`endpoint` Your Maxcompute endpoint start with http.
+
+### project [string]
+
+`project` Your Maxcompute project which is created in Alibaba Cloud.
+
+### table_name [string]
+
+`table_name` Target Maxcompute table name eg: fake.
+
+### partition_spec [string]
+
+`partition_spec` This spec of Maxcompute partition table eg:ds='20220101'.
+
+### split_row [int]
+
+`split_row` Number of rows per split, default: 10000.
+
+### common options
+
+Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details.
+
+## Examples
+
+```hocon
+source {
+ Maxcompute {
+ accessId="<your access id>"
+ accesskey="<your access Key>"
+ endpoint="<http://service.odps.aliyun.com/api>"
+ project="<your project>"
+ table_name="<your table name>"
+ #partition_spec="<your partition spec>"
+ #split_row = 10000
+ }
+}
+```
+
+## Changelog
+
+### next version
+
+- [Feature] Add Maxcompute Source
Connector([3640](https://github.com/apache/incubator-seatunnel/pull/3640))
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 35f5c2014..f3bec6c14 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -160,4 +160,6 @@ seatunnel.source.Notion = connector-http-notion
seatunnel.sink.RabbitMQ = connector-rabbitmq
seatunnel.source.RabbitMQ = connector-rabbitmq
seatunnel.source.OpenMldb = connector-openmldb
+seatunnel.source.Maxcompute = connector-maxcompute
+seatunnel.sink.Maxcompute = connector-maxcompute
seatunnel.source.MySQL-CDC = connector-cdc-mysql
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-maxcompute/pom.xml
b/seatunnel-connectors-v2/connector-maxcompute/pom.xml
new file mode 100644
index 000000000..66c6503f5
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-maxcompute/pom.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>seatunnel-connectors-v2</artifactId>
+ <groupId>org.apache.seatunnel</groupId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>connector-maxcompute</artifactId>
+
+ <properties>
+ <maxcompute.version>0.31.3</maxcompute.version>
+ <commons.lang3.version>3.4</commons.lang3.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.aliyun.odps</groupId>
+ <artifactId>odps-sdk-core</artifactId>
+ <version>${maxcompute.version}-public</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>${commons.lang3.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java
new file mode 100644
index 000000000..29420520b
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.io.Serializable;
+
+public class MaxcomputeConfig implements Serializable {
+ private static final int SPLIT_ROW_DEFAULT = 10000;
+ public static final Option<String> ACCESS_ID = Options.key("accessId")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Your Maxcompute accessId which cloud be access from
Alibaba Cloud");
+ public static final Option<String> ACCESS_KEY = Options.key("accesskey")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Your Maxcompute accessKey which cloud be access from
Alibaba Cloud");
+ public static final Option<String> ENDPOINT = Options.key("endpoint")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Your Maxcompute endpoint start with http");
+ public static final Option<String> PROJECT = Options.key("project")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Your Maxcompute project which is created in Alibaba
Cloud");
+ public static final Option<String> TABLE_NAME = Options.key("table_name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Target Maxcompute table name eg: fake");
+ public static final Option<String> PARTITION_SPEC =
Options.key("partition_spec")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("This spec of Maxcompute partition table.");
+ public static final Option<Integer> SPLIT_ROW = Options.key("split_row")
+ .intType()
+ .defaultValue(SPLIT_ROW_DEFAULT)
+ .withDescription("Number of rows per split. default: 10000");
+ public static final Option<Boolean> OVERWRITE = Options.key("overwrite")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to overwrite the table or partition");
+}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/exception/MaxcomputeConnectorException.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/exception/MaxcomputeConnectorException.java
new file mode 100644
index 000000000..83cdaaa3e
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/exception/MaxcomputeConnectorException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class MaxcomputeConnectorException extends SeaTunnelRuntimeException {
+
+ public MaxcomputeConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
String errorMessage) {
+ super(seaTunnelErrorCode, errorMessage);
+ }
+
+ public MaxcomputeConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
String errorMessage, Throwable cause) {
+ super(seaTunnelErrorCode, errorMessage, cause);
+ }
+
+ public MaxcomputeConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
Throwable cause) {
+ super(seaTunnelErrorCode, cause);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
new file mode 100644
index 000000000..cae4005b7
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.sink;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@AutoService(SeaTunnelSink.class)
+public class MaxcomputeSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+ private static final Logger LOG =
LoggerFactory.getLogger(MaxcomputeSink.class);
+ private Config pluginConfig;
+ private SeaTunnelRowType typeInfo;
+
+ @Override
+ public String getPluginName() {
+ return "Maxcompute";
+ }
+
+ @Override
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ this.pluginConfig = pluginConfig;
+ MaxcomputeUtil.initTableOrPartition(pluginConfig);
+ }
+
+ @Override
+ public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+ this.typeInfo = seaTunnelRowType;
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+ return this.typeInfo;
+ }
+
+ @Override
+ public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context) {
+ return new MaxcomputeWriter(this.typeInfo, this.pluginConfig);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
new file mode 100644
index 000000000..92ef0e664
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.sink;
+
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID;
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY;
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.OVERWRITE;
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class MaxcomputeSinkFactory implements TableSinkFactory {
+ @Override
+ public String factoryIdentifier() {
+ return "Maxcompute";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(ACCESS_ID, ACCESS_KEY, ENDPOINT, PROJECT, TABLE_NAME)
+ .optional(PARTITION_SPEC, OVERWRITE)
+ .build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
new file mode 100644
index 000000000..d2dcb0d23
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.sink;
+
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeTypeMapper;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.aliyun.odps.PartitionSpec;
+import com.aliyun.odps.Table;
+import com.aliyun.odps.TableSchema;
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.data.RecordWriter;
+import com.aliyun.odps.tunnel.TableTunnel;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+
+@Slf4j
+public class MaxcomputeWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+ private final SeaTunnelRowType seaTunnelRowType;
+ private final RecordWriter recordWriter;
+ private final TableTunnel.UploadSession session;
+ private final TableSchema tableSchema;
+
+ private Config pluginConfig;
+
+ public MaxcomputeWriter(SeaTunnelRowType seaTunnelRowType, Config
pluginConfig) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ this.pluginConfig = pluginConfig;
+ try {
+ Table table = MaxcomputeUtil.getTable(pluginConfig);
+ this.tableSchema = table.getSchema();
+ TableTunnel tunnel = MaxcomputeUtil.getTableTunnel(pluginConfig);
+ if (this.pluginConfig.hasPath(PARTITION_SPEC.key())) {
+ PartitionSpec partitionSpec = new
PartitionSpec(this.pluginConfig.getString(PARTITION_SPEC.key()));
+ session =
tunnel.createUploadSession(pluginConfig.getString(PROJECT.key()),
pluginConfig.getString(TABLE_NAME.key()), partitionSpec);
+ } else {
+ session =
tunnel.createUploadSession(pluginConfig.getString(PROJECT.key()),
pluginConfig.getString(TABLE_NAME.key()));
+ }
+ this.recordWriter =
session.openRecordWriter(Thread.currentThread().getId());
+ log.info("open record writer success");
+ } catch (Exception e) {
+ throw new
MaxcomputeConnectorException(CommonErrorCode.WRITER_OPERATION_FAILED, e);
+ }
+ }
+
+ @Override
+ public void write(SeaTunnelRow seaTunnelRow) throws IOException {
+ Record record =
MaxcomputeTypeMapper.getMaxcomputeRowData(seaTunnelRow, this.seaTunnelRowType);
+ recordWriter.write(record);
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.recordWriter.close();
+ try {
+ this.session.commit(new Long[]{Thread.currentThread().getId()});
+ } catch (Exception e) {
+ throw new
MaxcomputeConnectorException(CommonErrorCode.WRITER_OPERATION_FAILED, e);
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
new file mode 100644
index 000000000..b7381739a
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.source;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeTypeMapper;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@AutoService(SeaTunnelSource.class)
+public class MaxcomputeSource implements SeaTunnelSource<SeaTunnelRow,
MaxcomputeSourceSplit, MaxcomputeSourceState> {
+ private SeaTunnelRowType typeInfo;
+ private Config pluginConfig;
+
+ @Override
+ public String getPluginName() {
+ return "Maxcompute";
+ }
+
+ @Override
+ public void prepare(Config pluginConfig) {
+ this.typeInfo = MaxcomputeTypeMapper.getSeaTunnelRowType(pluginConfig);
+ this.pluginConfig = pluginConfig;
+ }
+
+ @Override
+ public SeaTunnelRowType getProducedType() {
+ return this.typeInfo;
+ }
+
+ @Override
+ public SourceReader<SeaTunnelRow, MaxcomputeSourceSplit>
createReader(SourceReader.Context readerContext) throws Exception {
+ return new MaxcomputeSourceReader(this.pluginConfig, readerContext,
this.typeInfo);
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ @Override
+ public SourceSplitEnumerator<MaxcomputeSourceSplit, MaxcomputeSourceState>
createEnumerator(SourceSplitEnumerator.Context<MaxcomputeSourceSplit>
enumeratorContext) throws Exception {
+ return new MaxcomputeSourceSplitEnumerator(enumeratorContext,
this.pluginConfig);
+ }
+
+ @Override
+ public SourceSplitEnumerator<MaxcomputeSourceSplit, MaxcomputeSourceState>
restoreEnumerator(SourceSplitEnumerator.Context<MaxcomputeSourceSplit>
enumeratorContext, MaxcomputeSourceState checkpointState) throws Exception {
+ return new MaxcomputeSourceSplitEnumerator(enumeratorContext,
this.pluginConfig, checkpointState);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java
new file mode 100644
index 000000000..152ca8e54
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.source;
+
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID;
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY;
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SPLIT_ROW;
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class MaxcomputeSourceFactory implements TableSourceFactory {
+ @Override
+ public String factoryIdentifier() {
+ return "Maxcompute";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(ACCESS_ID, ACCESS_KEY, ENDPOINT, PROJECT, TABLE_NAME)
+ .optional(PARTITION_SPEC, SPLIT_ROW)
+ .build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceReader.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceReader.java
new file mode 100644
index 000000000..c3e978b0f
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceReader.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.source;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeTypeMapper;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.tunnel.TableTunnel;
+import com.aliyun.odps.tunnel.io.TunnelRecordReader;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Slf4j
+public class MaxcomputeSourceReader implements SourceReader<SeaTunnelRow,
MaxcomputeSourceSplit> {
+ private final SourceReader.Context context;
+ private final Set<MaxcomputeSourceSplit> sourceSplits;
+ private Config pluginConfig;
+ boolean noMoreSplit;
+ private SeaTunnelRowType seaTunnelRowType;
+
+ public MaxcomputeSourceReader(Config pluginConfig, SourceReader.Context
context, SeaTunnelRowType seaTunnelRowType) {
+ this.pluginConfig = pluginConfig;
+ this.context = context;
+ this.sourceSplits = new HashSet<>();
+ this.seaTunnelRowType = seaTunnelRowType;
+ }
+
+ @Override
+ public void open() {
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+ sourceSplits.forEach(source -> {
+ try {
+ TableTunnel.DownloadSession session =
MaxcomputeUtil.getDownloadSession(pluginConfig);
+ TunnelRecordReader recordReader =
session.openRecordReader(source.getSplitId(), source.getRowNum());
+ log.info("open record reader success");
+ Record record;
+ while ((record = recordReader.read()) != null) {
+ SeaTunnelRow seaTunnelRow =
MaxcomputeTypeMapper.getSeaTunnelRowData(record, seaTunnelRowType);
+ output.collect(seaTunnelRow);
+ }
+ recordReader.close();
+ } catch (Exception e) {
+ throw new
MaxcomputeConnectorException(CommonErrorCode.READER_OPERATION_FAILED, e);
+ }
+ });
+ if (this.noMoreSplit &&
Boundedness.BOUNDED.equals(context.getBoundedness())) {
+ // signal to the source that we have reached the end of the data.
+ log.info("Closed the bounded Maxcompute source");
+ context.signalNoMoreElement();
+ }
+ }
+
+ @Override
+ public List<MaxcomputeSourceSplit> snapshotState(long checkpointId) throws
Exception {
+ return new ArrayList<>(sourceSplits);
+ }
+
+ @Override
+ public void addSplits(List<MaxcomputeSourceSplit> splits) {
+ sourceSplits.addAll(splits);
+ }
+
+ @Override
+ public void handleNoMoreSplits() {
+ this.noMoreSplit = true;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplit.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplit.java
new file mode 100644
index 000000000..4ab73c335
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplit.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+import lombok.Getter;
+
+public class MaxcomputeSourceSplit implements SourceSplit {
+ @Getter
+ private int splitId;
+ @Getter
+ private long rowNum;
+
+ public MaxcomputeSourceSplit(int splitId, long rowNum) {
+ this.splitId = splitId;
+ this.rowNum = rowNum;
+ }
+
+ @Override
+ public String splitId() {
+ return String.valueOf(this.splitId);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java
new file mode 100644
index 000000000..ad2363f3d
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.source;
+
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SPLIT_ROW;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.aliyun.odps.tunnel.TableTunnel;
+import com.aliyun.odps.tunnel.TunnelException;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@Slf4j
+public class MaxcomputeSourceSplitEnumerator implements
SourceSplitEnumerator<MaxcomputeSourceSplit, MaxcomputeSourceState> {
+ private final Context<MaxcomputeSourceSplit> enumeratorContext;
+ private final Map<Integer, Set<MaxcomputeSourceSplit>> pendingSplits;
+ private Set<MaxcomputeSourceSplit> assignedSplits;
+ private Config pluginConfig;
+
+ public
MaxcomputeSourceSplitEnumerator(SourceSplitEnumerator.Context<MaxcomputeSourceSplit>
enumeratorContext, Config pluginConfig) {
+ this.enumeratorContext = enumeratorContext;
+ this.pluginConfig = pluginConfig;
+ this.pendingSplits = new HashMap<>();
+ this.assignedSplits = new HashSet<>();
+ }
+
+ public
MaxcomputeSourceSplitEnumerator(SourceSplitEnumerator.Context<MaxcomputeSourceSplit>
enumeratorContext, Config pluginConfig,
+ MaxcomputeSourceState sourceState) {
+ this(enumeratorContext, pluginConfig);
+ this.assignedSplits = sourceState.getAssignedSplit();
+ }
+
+ @Override
+ public void open() {
+ }
+
+ @Override
+ public void run() throws Exception {
+ discoverySplits();
+ assignPendingSplits();
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void addSplitsBack(List<MaxcomputeSourceSplit> splits, int
subtaskId) {
+ addSplitChangeToPendingAssignments(splits);
+ }
+
+ @Override
+ public int currentUnassignedSplitSize() {
+ return pendingSplits.size();
+ }
+
+ @Override
+ public void registerReader(int subtaskId) {
+ }
+
+ @Override
+ public MaxcomputeSourceState snapshotState(long checkpointId) {
+ return new MaxcomputeSourceState(assignedSplits);
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId) {
+ }
+
+ private void discoverySplits() throws TunnelException {
+ TableTunnel.DownloadSession session =
MaxcomputeUtil.getDownloadSession(this.pluginConfig);
+ long recordCount = session.getRecordCount();
+ int numReaders = enumeratorContext.currentParallelism();
+ int splitRowNum = (int) Math.ceil((double) recordCount / numReaders);
+ int splitRow = SPLIT_ROW.defaultValue();
+ if (this.pluginConfig.hasPath(SPLIT_ROW.key())) {
+ splitRow = this.pluginConfig.getInt(SPLIT_ROW.key());
+ }
+ Set<MaxcomputeSourceSplit> allSplit = new HashSet<>();
+ for (int i = 0; i < numReaders; i++) {
+ int readerStart = i * splitRowNum;
+ int readerEnd = (int) Math.min((i + 1) * splitRowNum, recordCount);
+ for (int num = readerStart; num < readerEnd; num += splitRow) {
+ allSplit.add(new MaxcomputeSourceSplit(num, Math.min(splitRow,
readerEnd - num)));
+ }
+ }
+ assignedSplits.forEach(allSplit::remove);
+ addSplitChangeToPendingAssignments(allSplit);
+ log.debug("Assigned {} to {} readers.", allSplit, numReaders);
+ log.info("Calculated splits successfully, the size of splits is {}.",
allSplit.size());
+ }
+
+ private void
addSplitChangeToPendingAssignments(Collection<MaxcomputeSourceSplit> newSplits)
{
+ for (MaxcomputeSourceSplit split : newSplits) {
+ int ownerReader = split.getSplitId() %
enumeratorContext.currentParallelism();
+ pendingSplits.computeIfAbsent(ownerReader, r -> new HashSet<>())
+ .add(split);
+ }
+ }
+
+ private void assignPendingSplits() {
+ // Check if there's any pending splits for given readers
+ for (int pendingReader : enumeratorContext.registeredReaders()) {
+ // Remove pending assignment for the reader
+ final Set<MaxcomputeSourceSplit> pendingAssignmentForReader =
+ pendingSplits.remove(pendingReader);
+
+ if (pendingAssignmentForReader != null &&
!pendingAssignmentForReader.isEmpty()) {
+ // Mark pending splits as already assigned
+ assignedSplits.addAll(pendingAssignmentForReader);
+ // Assign pending splits to reader
+ log.info("Assigning splits to readers {} {}", pendingReader,
pendingAssignmentForReader);
+ enumeratorContext.assignSplit(pendingReader, new
ArrayList<>(pendingAssignmentForReader));
+ }
+ enumeratorContext.signalNoMoreSplits(pendingReader);
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceState.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceState.java
new file mode 100644
index 000000000..1abcb000c
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceState.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.source;
+
+import java.io.Serializable;
+import java.util.Set;
+
+public class MaxcomputeSourceState implements Serializable {
+ private Set<MaxcomputeSourceSplit> assignedSplit;
+
+ public MaxcomputeSourceState(Set<MaxcomputeSourceSplit> assignedSplit) {
+ this.assignedSplit = assignedSplit;
+ }
+
+ public Set<MaxcomputeSourceSplit> getAssignedSplit() {
+ return assignedSplit;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
new file mode 100644
index 000000000..fa6382c4f
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.util;
+
+import static com.aliyun.odps.OdpsType.ARRAY;
+import static com.aliyun.odps.OdpsType.BIGINT;
+import static com.aliyun.odps.OdpsType.BINARY;
+import static com.aliyun.odps.OdpsType.BOOLEAN;
+import static com.aliyun.odps.OdpsType.DATE;
+import static com.aliyun.odps.OdpsType.DECIMAL;
+import static com.aliyun.odps.OdpsType.DOUBLE;
+import static com.aliyun.odps.OdpsType.FLOAT;
+import static com.aliyun.odps.OdpsType.INT;
+import static com.aliyun.odps.OdpsType.MAP;
+import static com.aliyun.odps.OdpsType.SMALLINT;
+import static com.aliyun.odps.OdpsType.STRING;
+import static com.aliyun.odps.OdpsType.TIMESTAMP;
+import static com.aliyun.odps.OdpsType.TINYINT;
+import static com.aliyun.odps.OdpsType.VOID;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.aliyun.odps.Column;
+import com.aliyun.odps.OdpsType;
+import com.aliyun.odps.Table;
+import com.aliyun.odps.TableSchema;
+import com.aliyun.odps.data.ArrayRecord;
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.type.ArrayTypeInfo;
+import com.aliyun.odps.type.DecimalTypeInfo;
+import com.aliyun.odps.type.MapTypeInfo;
+import com.aliyun.odps.type.StructTypeInfo;
+import com.aliyun.odps.type.TypeInfo;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.sql.SQLException;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+@Slf4j
+public class MaxcomputeTypeMapper implements Serializable {
+
+ private static SeaTunnelDataType<?> maxcomputeType2SeaTunnelType(TypeInfo
typeInfo) {
+ switch (typeInfo.getOdpsType()) {
+ case BIGINT:
+ return BasicType.LONG_TYPE;
+ case DOUBLE:
+ return BasicType.DOUBLE_TYPE;
+ case BOOLEAN:
+ return BasicType.BOOLEAN_TYPE;
+ case DECIMAL:
+ return mappingDecimalType((DecimalTypeInfo) typeInfo);
+ case MAP:
+ return mappingMapType((MapTypeInfo) typeInfo);
+ case ARRAY:
+ return mappingListType((ArrayTypeInfo) typeInfo);
+ case VOID:
+ return BasicType.VOID_TYPE;
+ case TINYINT:
+ case SMALLINT:
+ case INT:
+ return BasicType.INT_TYPE;
+ case FLOAT:
+ return BasicType.FLOAT_TYPE;
+ case CHAR:
+ case VARCHAR:
+ case STRING:
+ return BasicType.STRING_TYPE;
+ case DATE:
+ return LocalTimeType.LOCAL_DATE_TYPE;
+ case TIMESTAMP:
+ case DATETIME:
+ return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+ case BINARY:
+ return PrimitiveByteArrayType.INSTANCE;
+ case STRUCT:
+ return mappingStructType((StructTypeInfo) typeInfo);
+ case INTERVAL_DAY_TIME:
+ return LocalTimeType.LOCAL_TIME_TYPE;
+ case INTERVAL_YEAR_MONTH:
+ default:
+ throw new
MaxcomputeConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
String.format(
+ "Doesn't support Maxcompute type '%s' .",
+ typeInfo.getTypeName()));
+ }
+ }
+
+ private static DecimalType mappingDecimalType(DecimalTypeInfo
decimalTypeInfo) {
+ return new DecimalType(decimalTypeInfo.getPrecision(),
decimalTypeInfo.getScale());
+ }
+
+ private static MapType mappingMapType(MapTypeInfo mapTypeInfo) {
+ return new
MapType(maxcomputeType2SeaTunnelType(mapTypeInfo.getKeyTypeInfo()),
maxcomputeType2SeaTunnelType(mapTypeInfo.getValueTypeInfo()));
+ }
+
+ private static ArrayType mappingListType(ArrayTypeInfo arrayTypeInfo) {
+ switch (arrayTypeInfo.getOdpsType()) {
+ case BOOLEAN:
+ return ArrayType.BOOLEAN_ARRAY_TYPE;
+ case INT:
+ return ArrayType.INT_ARRAY_TYPE;
+ case BIGINT:
+ return ArrayType.LONG_ARRAY_TYPE;
+ case FLOAT:
+ return ArrayType.FLOAT_ARRAY_TYPE;
+ case DOUBLE:
+ return ArrayType.DOUBLE_ARRAY_TYPE;
+ case STRING:
+ return ArrayType.STRING_ARRAY_TYPE;
+ default:
+ throw new
MaxcomputeConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
String.format(
+ "Doesn't support Maxcompute type '%s' .",
+ arrayTypeInfo.getTypeName()));
+ }
+ }
+
+ private static SeaTunnelRowType mappingStructType(StructTypeInfo
structType) {
+ List<TypeInfo> fields = structType.getFieldTypeInfos();
+ List<String> fieldNames = new ArrayList<>(fields.size());
+ List<SeaTunnelDataType<?>> fieldTypes = new ArrayList<>(fields.size());
+ for (TypeInfo field : fields) {
+ fieldNames.add(field.getTypeName());
+ fieldTypes.add(maxcomputeType2SeaTunnelType(field));
+ }
+ return new SeaTunnelRowType(fieldNames.toArray(new String[0]),
+ fieldTypes.toArray(new SeaTunnelDataType[0]));
+ }
+
+ private static OdpsType seaTunnelType2MaxcomputeType(SeaTunnelDataType<?>
seaTunnelDataType) {
+ switch (seaTunnelDataType.getSqlType()) {
+ case ARRAY:
+ return ARRAY;
+ case MAP:
+ return MAP;
+ case STRING:
+ return STRING;
+ case BOOLEAN:
+ return BOOLEAN;
+ case TINYINT:
+ return TINYINT;
+ case SMALLINT:
+ return SMALLINT;
+ case INT:
+ return INT;
+ case BIGINT:
+ return BIGINT;
+ case FLOAT:
+ return FLOAT;
+ case DOUBLE:
+ return DOUBLE;
+ case DECIMAL:
+ return DECIMAL;
+ case BYTES:
+ return BINARY;
+ case DATE:
+ return DATE;
+ case TIMESTAMP:
+ return TIMESTAMP;
+ case NULL:
+ return VOID;
+ case TIME:
+ default:
+ throw new
MaxcomputeConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
String.format(
+ "Doesn't support SeaTunnelDataType type '%s' .",
+ seaTunnelDataType.getSqlType()));
+ }
+ }
+
+ public static SeaTunnelRowType getSeaTunnelRowType(Config pluginConfig) {
+ Table table = MaxcomputeUtil.getTable(pluginConfig);
+ TableSchema tableSchema = table.getSchema();
+ ArrayList<SeaTunnelDataType<?>> seaTunnelDataTypes = new ArrayList<>();
+ ArrayList<String> fieldNames = new ArrayList<>();
+ try {
+ for (int i = 0; i < tableSchema.getColumns().size(); i++) {
+ fieldNames.add(tableSchema.getColumns().get(i).getName());
+ TypeInfo maxcomputeTypeInfo =
tableSchema.getColumns().get(i).getTypeInfo();
+ SeaTunnelDataType<?> seaTunnelDataType =
maxcomputeType2SeaTunnelType(maxcomputeTypeInfo);
+ seaTunnelDataTypes.add(seaTunnelDataType);
+ }
+ } catch (Exception e) {
+ throw new
MaxcomputeConnectorException(CommonErrorCode.TABLE_SCHEMA_GET_FAILED, e);
+ }
+ return new SeaTunnelRowType(fieldNames.toArray(new
String[fieldNames.size()]), seaTunnelDataTypes.toArray(new
SeaTunnelDataType<?>[seaTunnelDataTypes.size()]));
+ }
+
+ public static TableSchema seaTunnelRowType2TableSchema(SeaTunnelRowType
seaTunnelRowType) {
+ TableSchema tableSchema = new TableSchema();
+ for (int i = 0; i < seaTunnelRowType.getFieldNames().length; i++) {
+ OdpsType odpsType =
seaTunnelType2MaxcomputeType(seaTunnelRowType.getFieldType(i));
+ Column column = new Column(seaTunnelRowType.getFieldName(i),
odpsType);
+ tableSchema.addColumn(column);
+ }
+ return tableSchema;
+ }
+
+ private static Object resolveObject(Object field, SeaTunnelDataType<?>
fieldType) {
+ if (field == null) {
+ return null;
+ }
+ switch (fieldType.getSqlType()) {
+ case ARRAY:
+ ArrayList<Object> origArray = new ArrayList<>();
+ java.util.Arrays.stream(((Record)
field).getColumns()).iterator().forEachRemaining(origArray::add);
+ SeaTunnelDataType<?> elementType = ((ArrayType<?, ?>)
fieldType).getElementType();
+ switch (elementType.getSqlType()) {
+ case STRING:
+ return origArray.toArray(new String[0]);
+ case BOOLEAN:
+ return origArray.toArray(new Boolean[0]);
+ case INT:
+ return origArray.toArray(new Integer[0]);
+ case BIGINT:
+ return origArray.toArray(new Long[0]);
+ case FLOAT:
+ return origArray.toArray(new Float[0]);
+ case DOUBLE:
+ return origArray.toArray(new Double[0]);
+ default:
+ String errorMsg = String.format("SeaTunnel array type
not support this type [%s] now", fieldType.getSqlType());
+ throw new
MaxcomputeConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, "SeaTunnel
not support this data type now");
+ }
+ case MAP:
+ HashMap<Object, Object> dataMap = new HashMap<>();
+ SeaTunnelDataType<?> keyType = ((MapType<?, ?>)
fieldType).getKeyType();
+ SeaTunnelDataType<?> valueType = ((MapType<?, ?>)
fieldType).getValueType();
+ HashMap<Object, Object> origDataMap = (HashMap<Object,
Object>) field;
+ origDataMap.forEach((key, value) ->
dataMap.put(resolveObject(key, keyType), resolveObject(value, valueType)));
+ return dataMap;
+ case BOOLEAN:
+ case INT:
+ case BIGINT:
+ case FLOAT:
+ case DOUBLE:
+ case DECIMAL:
+ case DATE:
+ return field;
+ case STRING:
+ return field.toString();
+ case TINYINT:
+ return Byte.parseByte(field.toString());
+ case SMALLINT:
+ return Short.parseShort(field.toString());
+ case NULL:
+ return null;
+ case BYTES:
+ ByteBuffer buffer = (ByteBuffer) field;
+ byte[] bytes = new byte[buffer.remaining()];
+ buffer.get(bytes, 0, bytes.length);
+ return bytes;
+ case TIMESTAMP:
+ Instant instant = Instant.ofEpochMilli((long) field);
+ return LocalDateTime.ofInstant(instant, ZoneId.of("+8"));
+ default:
+ // do nothing
+ // never got in there
+ throw new
MaxcomputeConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, "SeaTunnel
not support this data type now");
+ }
+ }
+
+ public static SeaTunnelRow getSeaTunnelRowData(Record rs, SeaTunnelRowType
typeInfo) throws SQLException {
+ List<Object> fields = new ArrayList<>();
+ SeaTunnelDataType<?>[] seaTunnelDataTypes = typeInfo.getFieldTypes();
+ for (int i = 0; i < rs.getColumns().length; i++) {
+ fields.add(resolveObject(rs.get(i), seaTunnelDataTypes[i]));
+ }
+ return new SeaTunnelRow(fields.toArray());
+ }
+
+ public static Record getMaxcomputeRowData(SeaTunnelRow seaTunnelRow,
SeaTunnelRowType seaTunnelRowType) {
+ TableSchema tableSchema =
seaTunnelRowType2TableSchema(seaTunnelRowType);
+ ArrayRecord arrayRecord = new ArrayRecord(tableSchema);
+ for (int i = 0; i < seaTunnelRow.getFields().length; i++) {
+ arrayRecord.set(i, resolveObject(seaTunnelRow.getField(i),
seaTunnelRowType.getFieldType(i)));
+ }
+ return arrayRecord;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeUtil.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeUtil.java
new file mode 100644
index 000000000..4bf37b95b
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeUtil.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.util;
+
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID;
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY;
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.OVERWRITE;
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;
+
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.aliyun.odps.Odps;
+import com.aliyun.odps.PartitionSpec;
+import com.aliyun.odps.Table;
+import com.aliyun.odps.account.Account;
+import com.aliyun.odps.account.AliyunAccount;
+import com.aliyun.odps.tunnel.TableTunnel;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class MaxcomputeUtil {
+ public static Table getTable(Config pluginConfig) {
+ Odps odps = getOdps(pluginConfig);
+ Table table =
odps.tables().get(pluginConfig.getString(TABLE_NAME.key()));
+ return table;
+ }
+
+ public static TableTunnel getTableTunnel(Config pluginConfig) {
+ Odps odps = getOdps(pluginConfig);
+ TableTunnel tunnel = new TableTunnel(odps);
+ return tunnel;
+ }
+
+ public static Odps getOdps(Config pluginConfig) {
+ Account account = new
AliyunAccount(pluginConfig.getString(ACCESS_ID.key()),
pluginConfig.getString(ACCESS_KEY.key()));
+ Odps odps = new Odps(account);
+ odps.setEndpoint(pluginConfig.getString(ENDPOINT.key()));
+ odps.setDefaultProject(pluginConfig.getString(PROJECT.key()));
+ return odps;
+ }
+
+ public static TableTunnel.DownloadSession getDownloadSession(Config
pluginConfig) {
+ TableTunnel tunnel = getTableTunnel(pluginConfig);
+ TableTunnel.DownloadSession session;
+ try {
+ if (pluginConfig.hasPath(PARTITION_SPEC.key())) {
+ PartitionSpec partitionSpec = new
PartitionSpec(pluginConfig.getString(PARTITION_SPEC.key()));
+ session =
tunnel.createDownloadSession(pluginConfig.getString(PROJECT.key()),
pluginConfig.getString(TABLE_NAME.key()), partitionSpec);
+ } else {
+ session =
tunnel.createDownloadSession(pluginConfig.getString(PROJECT.key()),
pluginConfig.getString(TABLE_NAME.key()));
+ }
+ } catch (Exception e) {
+ throw new
MaxcomputeConnectorException(CommonErrorCode.READER_OPERATION_FAILED, e);
+ }
+ return session;
+ }
+
+ public static void initTableOrPartition(Config pluginConfig) {
+ Boolean overwrite = OVERWRITE.defaultValue();
+ if (pluginConfig.hasPath(OVERWRITE.key())) {
+ overwrite = pluginConfig.getBoolean(OVERWRITE.key());
+ }
+ try {
+ Table table = MaxcomputeUtil.getTable(pluginConfig);
+ if (pluginConfig.hasPath(PARTITION_SPEC.key())) {
+ PartitionSpec partitionSpec = new
PartitionSpec(pluginConfig.getString(PARTITION_SPEC.key()));
+ if (overwrite) {
+ try {
+ table.deletePartition(partitionSpec, true);
+ } catch (NullPointerException e) {
+ log.debug("NullPointerException when delete table
partition");
+ }
+ }
+ table.createPartition(partitionSpec, true);
+ } else {
+ if (overwrite) {
+ try {
+ table.truncate();
+ } catch (NullPointerException e) {
+ log.debug("NullPointerException when truncate table");
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new
MaxcomputeConnectorException(CommonErrorCode.READER_OPERATION_FAILED, e);
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/resources/maxcompute_to_maxcompute.conf
b/seatunnel-connectors-v2/connector-maxcompute/src/main/resources/maxcompute_to_maxcompute.conf
new file mode 100644
index 000000000..403876619
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/resources/maxcompute_to_maxcompute.conf
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 2
+ job.mode = "STREAMING"
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ MaxcomputeSource {
+ accessId="<your access id>"
+ accesskey="<your access Key>"
+ endpoint="<http://service.odps.aliyun.com/api>"
+ project="<your project>"
+ table_name="<your table name>"
+ #partition_spec="<your partition spec>"
+ #split_row = 10000
+ }
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/category/source-v2
+}
+
+transform {
+ sql {
+ source_table_name = "fake"
+ sql = "select * from fake"
+ }
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/category/transform
+}
+
+sink {
+ MaxcomputeSink {
+ accessId="<your access id>"
+ accesskey="<your access Key>"
+ endpoint="<http://service.odps.aliyun.com/api>"
+ project="<your project>"
+ result_table_name="<your table name>"
+ #partition_spec="<your partition spec>"
+ #overwrite = false
+ }
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/category/sink-v2
+}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/BasicTypeToOdpsTypeTest.java
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/BasicTypeToOdpsTypeTest.java
new file mode 100644
index 000000000..da14d0680
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/BasicTypeToOdpsTypeTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeTypeMapper;
+
+import com.aliyun.odps.Column;
+import com.aliyun.odps.OdpsType;
+import com.aliyun.odps.data.ArrayRecord;
+import com.aliyun.odps.data.Record;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.sql.SQLException;
+
+public class BasicTypeToOdpsTypeTest {
+
+ private static void testType(String fieldName, SeaTunnelDataType<?>
seaTunnelDataType, OdpsType odpsType, Object object) throws SQLException {
+ SeaTunnelRowType typeInfo = new SeaTunnelRowType(new String[]{
+ fieldName
+ }, new SeaTunnelDataType<?>[]{
+ seaTunnelDataType
+ });
+
+ ArrayRecord record = new ArrayRecord(new Column[]{
+ new Column(fieldName, odpsType)
+ });
+ record.set(fieldName, object);
+
+ SeaTunnelRow seaTunnelRow =
MaxcomputeTypeMapper.getSeaTunnelRowData(record, typeInfo);
+ Record tRecord =
MaxcomputeTypeMapper.getMaxcomputeRowData(seaTunnelRow, typeInfo);
+
+ for (int i = 0; i < tRecord.getColumns().length; i++) {
+ Assertions.assertEquals(record.get(i), tRecord.get(i));
+ }
+ }
+
+ @SneakyThrows
+ @Test
+ void testSTRING_TYPE_2_STRING() {
+ testType("STRING_TYPE_2_STRING", BasicType.STRING_TYPE,
OdpsType.STRING, "hello");
+ }
+
+ @SneakyThrows
+ @Test
+ void testBOOLEAN_TYPE_2_BOOLEAN() {
+ testType("BOOLEAN_TYPE_2_BOOLEAN", BasicType.BOOLEAN_TYPE,
OdpsType.BOOLEAN, Boolean.TRUE);
+ }
+
+ @SneakyThrows
+ @Test
+ void testSHORT_TYPE_2_SMALLINT() {
+ testType("SHORT_TYPE_2_SMALLINT", BasicType.SHORT_TYPE,
OdpsType.SMALLINT, Short.MAX_VALUE);
+ }
+
+ @SneakyThrows
+ @Test
+ void testLONG_TYPE_2_BIGINT() {
+ testType("LONG_TYPE_2_BIGINT", BasicType.LONG_TYPE, OdpsType.BIGINT,
Long.MAX_VALUE);
+ }
+
+ @SneakyThrows
+ @Test
+ void testFLOAT_TYPE_2_FLOAT_TYPE() {
+ testType("FLOAT_TYPE_2_FLOAT_TYPE", BasicType.FLOAT_TYPE,
OdpsType.FLOAT, Float.MAX_VALUE);
+ }
+
+ @SneakyThrows
+ @Test
+ void testDOUBLE_TYPE_2_DOUBLE() {
+ testType("DOUBLE_TYPE_2_DOUBLE", BasicType.DOUBLE_TYPE,
OdpsType.DOUBLE, Double.MAX_VALUE);
+ }
+
+ @SneakyThrows
+ @Test
+ void testVOID_TYPE_2_VOID() {
+ testType("VOID_TYPE_2_VOID", BasicType.VOID_TYPE, OdpsType.VOID, null);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/MaxcomputeSourceFactoryTest.java
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/MaxcomputeSourceFactoryTest.java
new file mode 100644
index 000000000..53627c2f6
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/MaxcomputeSourceFactoryTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.sink.MaxcomputeSinkFactory;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.source.MaxcomputeSourceFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class MaxcomputeSourceFactoryTest {
+ @Test
+ void optionRule() {
+ Assertions.assertNotNull((new MaxcomputeSourceFactory()).optionRule());
+ Assertions.assertNotNull((new MaxcomputeSinkFactory()).optionRule());
+ }
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index 7a7a7d3af..3a0ba4cd4 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -65,6 +65,7 @@
<module>connector-slack</module>
<module>connector-rabbitmq</module>
<module>connector-openmldb</module>
+ <module>connector-maxcompute</module>
</modules>
<dependencyManagement>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index 5e8134f9d..c588db021 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -407,6 +407,12 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-maxcompute</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-cdc-mysql</artifactId>
@@ -575,4 +581,4 @@
</build>
</profile>
</profiles>
-</project>
+</project>
\ No newline at end of file