This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 946d89cb95 [Feature] Support catalog in MaxCompute Source (#5283)
946d89cb95 is described below

commit 946d89cb952571dbc90290a1027254a83f6dc5d9
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Oct 25 18:15:31 2023 +0800

    [Feature] Support catalog in MaxCompute Source (#5283)
---
 docs/en/connector-v2/source/Maxcompute.md          |  15 +
 .../com/aliyun/odps/type/SimpleArrayTypeInfo.java  |  52 ----
 .../com/aliyun/odps/type/SimpleMapTypeInfo.java    |  63 -----
 .../com/aliyun/odps/type/SimpleStructTypeInfo.java | 102 -------
 .../maxcompute/catalog/MaxComputeCatalog.java      | 160 +++++++++++
 .../MaxComputeCatalogFactory.java}                 |  26 +-
 .../catalog/MaxComputeDataTypeConvertor.java       | 302 +++++++++++++++++++++
 .../maxcompute/config/MaxcomputeConfig.java        |   3 +
 .../seatunnel/maxcompute/sink/MaxcomputeSink.java  |   4 +-
 .../maxcompute/sink/MaxcomputeSinkFactory.java     |   3 +-
 .../maxcompute/source/MaxcomputeSource.java        |  16 +-
 .../maxcompute/source/MaxcomputeSourceFactory.java |   6 +-
 .../maxcompute/util/MaxcomputeTypeMapper.java      | 111 +-------
 .../catalog/MaxComputeDataTypeConvertorTest.java   |  71 +++++
 .../maxcompute/source/MaxcomputeSourceTest.java    |  48 ++++
 15 files changed, 647 insertions(+), 335 deletions(-)

diff --git a/docs/en/connector-v2/source/Maxcompute.md 
b/docs/en/connector-v2/source/Maxcompute.md
index f30be5a0d2..cb9bc32dd3 100644
--- a/docs/en/connector-v2/source/Maxcompute.md
+++ b/docs/en/connector-v2/source/Maxcompute.md
@@ -26,6 +26,7 @@ Used to read data from Maxcompute.
 | partition_spec | string | no       | -             |
 | split_row      | int    | no       | 10000         |
 | common-options | string | no       |               |
+| schema         | config | no       |               |
 
 ### accessId [string]
 
@@ -59,6 +60,12 @@ Used to read data from Maxcompute.
 
 Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details.
 
+### schema [config]
+
+#### fields [Config]
+
+The schema information of upstream data.
+
 ## Examples
 
 ```hocon
@@ -71,6 +78,13 @@ source {
     table_name="<your table name>"
     #partition_spec="<your partition spec>"
     #split_row = 10000
+    schema {
+        fields {
+            name = string
+            age = int
+            gender = string 
+        }
+    }
   }
 }
 ```
@@ -80,4 +94,5 @@ source {
 ### next version
 
 - [Feature] Add Maxcompute Source 
Connector([3640](https://github.com/apache/seatunnel/pull/3640))
+- [Feature] Support Schema in MaxCompute 
Source([3640](https://github.com/apache/seatunnel/pull/5283))
 
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleArrayTypeInfo.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleArrayTypeInfo.java
deleted file mode 100644
index 76fddc10c4..0000000000
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleArrayTypeInfo.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.aliyun.odps.type;
-
-import org.apache.seatunnel.common.exception.CommonErrorCode;
-import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
-
-import com.aliyun.odps.OdpsType;
-
-public class SimpleArrayTypeInfo implements ArrayTypeInfo {
-    private final TypeInfo valueType;
-
-    SimpleArrayTypeInfo(TypeInfo typeInfo) {
-        if (typeInfo == null) {
-            throw new MaxcomputeConnectorException(
-                    CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Invalid element 
type.");
-        } else {
-            this.valueType = typeInfo;
-        }
-    }
-
-    public String getTypeName() {
-        return this.getOdpsType().name() + "<" + this.valueType.getTypeName() 
+ ">";
-    }
-
-    public TypeInfo getElementTypeInfo() {
-        return this.valueType;
-    }
-
-    public OdpsType getOdpsType() {
-        return OdpsType.ARRAY;
-    }
-
-    public String toString() {
-        return this.getTypeName();
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleMapTypeInfo.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleMapTypeInfo.java
deleted file mode 100644
index 6cedcd22bb..0000000000
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleMapTypeInfo.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.aliyun.odps.type;
-
-import org.apache.seatunnel.common.exception.CommonErrorCode;
-import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
-
-import com.aliyun.odps.OdpsType;
-
-public class SimpleMapTypeInfo implements MapTypeInfo {
-    private final TypeInfo keyType;
-    private final TypeInfo valueType;
-
-    SimpleMapTypeInfo(TypeInfo keyType, TypeInfo valueType) {
-        if (keyType != null && valueType != null) {
-            this.keyType = keyType;
-            this.valueType = valueType;
-        } else {
-            throw new MaxcomputeConnectorException(
-                    CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Invalid key or 
value type for map.");
-        }
-    }
-
-    public String getTypeName() {
-        return this.getOdpsType().name()
-                + "<"
-                + this.keyType.getTypeName()
-                + ","
-                + this.valueType.getTypeName()
-                + ">";
-    }
-
-    public TypeInfo getKeyTypeInfo() {
-        return this.keyType;
-    }
-
-    public TypeInfo getValueTypeInfo() {
-        return this.valueType;
-    }
-
-    public OdpsType getOdpsType() {
-        return OdpsType.MAP;
-    }
-
-    public String toString() {
-        return this.getTypeName();
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleStructTypeInfo.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleStructTypeInfo.java
deleted file mode 100644
index a628355221..0000000000
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleStructTypeInfo.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.aliyun.odps.type;
-
-import org.apache.seatunnel.common.exception.CommonErrorCode;
-import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
-
-import com.aliyun.odps.OdpsType;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-public class SimpleStructTypeInfo implements StructTypeInfo {
-    private final List<String> fieldNames;
-    private final List<TypeInfo> fieldTypeInfos;
-
-    SimpleStructTypeInfo(List<String> names, List<TypeInfo> typeInfos) {
-        this.validateParameters(names, typeInfos);
-        this.fieldNames = this.toLowerCase(names);
-        this.fieldTypeInfos = new ArrayList(typeInfos);
-    }
-
-    private List<String> toLowerCase(List<String> names) {
-        List<String> lowerNames = new ArrayList(names.size());
-        Iterator var3 = names.iterator();
-
-        while (var3.hasNext()) {
-            String name = (String) var3.next();
-            lowerNames.add(name.toLowerCase());
-        }
-
-        return lowerNames;
-    }
-
-    private void validateParameters(List<String> names, List<TypeInfo> 
typeInfos) {
-        if (names != null && typeInfos != null && !names.isEmpty() && 
!typeInfos.isEmpty()) {
-            if (names.size() != typeInfos.size()) {
-                throw new MaxcomputeConnectorException(
-                        CommonErrorCode.UNSUPPORTED_DATA_TYPE,
-                        "The amount of field names must be equal to the amount 
of field types.");
-            }
-        } else {
-            throw new MaxcomputeConnectorException(
-                    CommonErrorCode.UNSUPPORTED_DATA_TYPE,
-                    "Invalid name or element type for struct.");
-        }
-    }
-
-    public String getTypeName() {
-        StringBuilder stringBuilder = new 
StringBuilder(this.getOdpsType().name());
-        stringBuilder.append("<");
-
-        for (int i = 0; i < this.fieldNames.size(); ++i) {
-            if (i > 0) {
-                stringBuilder.append(",");
-            }
-
-            stringBuilder.append((String) this.fieldNames.get(i));
-            stringBuilder.append(":");
-            stringBuilder.append(((TypeInfo) 
this.fieldTypeInfos.get(i)).getTypeName());
-        }
-
-        stringBuilder.append(">");
-        return stringBuilder.toString();
-    }
-
-    public List<String> getFieldNames() {
-        return this.fieldNames;
-    }
-
-    public List<TypeInfo> getFieldTypeInfos() {
-        return this.fieldTypeInfos;
-    }
-
-    public int getFieldCount() {
-        return this.fieldNames.size();
-    }
-
-    public OdpsType getOdpsType() {
-        return OdpsType.STRUCT;
-    }
-
-    public String toString() {
-        return this.getTypeName();
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java
new file mode 100644
index 0000000000..b131277bd7
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import 
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+
+import com.aliyun.odps.Odps;
+import com.aliyun.odps.OdpsException;
+import com.aliyun.odps.Projects;
+import com.aliyun.odps.Tables;
+import com.aliyun.odps.account.Account;
+import com.aliyun.odps.account.AliyunAccount;
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID;
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
+
+@Slf4j
+public class MaxComputeCatalog implements Catalog {
+
+    private final ReadonlyConfig readonlyConfig;
+    private final String catalogName;
+
+    private Account account;
+
+    public MaxComputeCatalog(String catalogName, ReadonlyConfig options) {
+        this.readonlyConfig = options;
+        this.catalogName = catalogName;
+    }
+
+    @Override
+    public void open() throws CatalogException {
+        account = new AliyunAccount(readonlyConfig.get(ACCESS_ID), 
readonlyConfig.get(ACCESS_KEY));
+    }
+
+    @Override
+    public void close() throws CatalogException {}
+
+    @Override
+    public String getDefaultDatabase() throws CatalogException {
+        return readonlyConfig.get(PROJECT);
+    }
+
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException 
{
+        Odps odps = new Odps(account);
+        odps.setEndpoint(readonlyConfig.get(ENDPOINT));
+        odps.setDefaultProject(readonlyConfig.get(PROJECT));
+        Projects projects = odps.projects();
+        try {
+            return projects.exists(databaseName);
+        } catch (OdpsException e) {
+            throw new CatalogException("Check " + databaseName + " exist 
error", e);
+        }
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        try {
+            // todo: how to get all projects
+            String project = readonlyConfig.get(PROJECT);
+            if (databaseExists(project)) {
+                return Lists.newArrayList(project);
+            }
+            return Collections.emptyList();
+        } catch (Exception e) {
+            throw new CatalogException("listDatabases exist error", e);
+        }
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws CatalogException, DatabaseNotExistException {
+        Odps odps = new Odps(account);
+        odps.setEndpoint(readonlyConfig.get(ENDPOINT));
+        odps.setDefaultProject(databaseName);
+
+        Tables tables = odps.tables();
+        List<String> tableNames = new ArrayList<>();
+        tables.forEach(
+                table -> {
+                    tableNames.add(table.getName());
+                });
+        return tableNames;
+    }
+
+    @Override
+    public boolean tableExists(TablePath tablePath) throws CatalogException {
+        Odps odps = new Odps(account);
+        odps.setEndpoint(readonlyConfig.get(ENDPOINT));
+        odps.setDefaultProject(tablePath.getDatabaseName());
+
+        Tables tables = odps.tables();
+        try {
+            return tables.exists(tablePath.getTableName());
+        } catch (OdpsException e) {
+            throw new CatalogException("tableExists" + tablePath + " error", 
e);
+        }
+    }
+
+    @Override
+    public CatalogTable getTable(TablePath tablePath)
+            throws CatalogException, TableNotExistException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void createTable(TablePath tablePath, CatalogTable table, boolean 
ignoreIfExists)
+            throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
+            throws DatabaseAlreadyExistException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
+            throws DatabaseNotExistException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogFactory.java
similarity index 73%
copy from 
seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java
copy to 
seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogFactory.java
index f5b4fd03b7..4ed29c0ea6 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogFactory.java
@@ -15,40 +15,44 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.maxcompute.source;
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
 import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableSourceFactory;
 
 import com.google.auto.service.AutoService;
 
+import static org.apache.seatunnel.api.table.catalog.CatalogTableUtil.SCHEMA;
 import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID;
 import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY;
 import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
 import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME;
 import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
 import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SPLIT_ROW;
 import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;
 
 @AutoService(Factory.class)
-public class MaxcomputeSourceFactory implements TableSourceFactory {
+public class MaxComputeCatalogFactory implements CatalogFactory {
+
+    @Override
+    public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+        return new MaxComputeCatalog(catalogName, options);
+    }
+
     @Override
     public String factoryIdentifier() {
-        return "Maxcompute";
+        return PLUGIN_NAME;
     }
 
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
                 .required(ACCESS_ID, ACCESS_KEY, ENDPOINT, PROJECT, TABLE_NAME)
-                .optional(PARTITION_SPEC, SPLIT_ROW)
+                .optional(PARTITION_SPEC, SPLIT_ROW, SPLIT_ROW, SCHEMA)
                 .build();
     }
-
-    @Override
-    public Class<? extends SeaTunnelSource> getSourceClass() {
-        return MaxcomputeSource.class;
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java
new file mode 100644
index 0000000000..4e48f938aa
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog;
+
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
+
+import com.aliyun.odps.OdpsType;
+import com.aliyun.odps.type.ArrayTypeInfo;
+import com.aliyun.odps.type.DecimalTypeInfo;
+import com.aliyun.odps.type.MapTypeInfo;
+import com.aliyun.odps.type.StructTypeInfo;
+import com.aliyun.odps.type.TypeInfo;
+import com.aliyun.odps.type.TypeInfoFactory;
+import com.google.auto.service.AutoService;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+@AutoService(DataTypeConvertor.class)
+public class MaxComputeDataTypeConvertor implements 
DataTypeConvertor<TypeInfo> {
+
+    @Override
+    public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+        if (connectorDataType.startsWith("MAP")) {
+            // MAP<key,value>
+            int i = connectorDataType.indexOf(",");
+            return new MapType(
+                    toSeaTunnelType(connectorDataType.substring(4, i)),
+                    toSeaTunnelType(
+                            connectorDataType.substring(i + 1, 
connectorDataType.length() - 1)));
+        }
+        if (connectorDataType.startsWith("ARRAY")) {
+            // ARRAY<element>
+            SeaTunnelDataType<?> seaTunnelType =
+                    toSeaTunnelType(connectorDataType.substring(6, 
connectorDataType.length() - 1));
+            switch (seaTunnelType.getSqlType()) {
+                case STRING:
+                    return ArrayType.STRING_ARRAY_TYPE;
+                case BOOLEAN:
+                    return ArrayType.BOOLEAN_ARRAY_TYPE;
+                case BYTES:
+                    return ArrayType.BYTE_ARRAY_TYPE;
+                case SMALLINT:
+                    return ArrayType.SHORT_ARRAY_TYPE;
+                case INT:
+                    return ArrayType.INT_ARRAY_TYPE;
+                case BIGINT:
+                    return ArrayType.LONG_ARRAY_TYPE;
+                case FLOAT:
+                    return ArrayType.FLOAT_ARRAY_TYPE;
+                case DOUBLE:
+                    return ArrayType.DOUBLE_ARRAY_TYPE;
+                default:
+                    throw new MaxcomputeConnectorException(
+                            CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                            "Unsupported array element type: " + 
seaTunnelType);
+            }
+        }
+        if (connectorDataType.startsWith("STRUCT")) {
+            // STRUCT<field1:type1,field2:type2...>
+            // todo: support struct type
+            String substring = connectorDataType.substring(7, 
connectorDataType.length() - 1);
+            String[] entryArray = substring.split(",");
+            String[] fieldNames = new String[entryArray.length];
+            SeaTunnelDataType<?>[] fieldTypes = new 
SeaTunnelDataType<?>[entryArray.length];
+            for (int i = 0; i < entryArray.length; i++) {
+                String[] field = entryArray[i].split(":");
+                fieldNames[i] = field[0];
+                fieldTypes[i] = toSeaTunnelType(field[1]);
+            }
+            return new SeaTunnelRowType(fieldNames, fieldTypes);
+        }
+        if (connectorDataType.startsWith("DECIMAL")) {
+            // DECIMAL(precision,scale)
+            if (connectorDataType.contains("(")) {
+                String substring = connectorDataType.substring(8, 
connectorDataType.length() - 1);
+                String[] split = substring.split(",");
+                return new DecimalType(Integer.parseInt(split[0]), 
Integer.parseInt(split[1]));
+            } else {
+                return new DecimalType(54, 18);
+            }
+        }
+        if (connectorDataType.startsWith("CHAR") || 
connectorDataType.startsWith("VARCHAR")) {
+            // CHAR(n) or VARCHAR(n)
+            return BasicType.STRING_TYPE;
+        }
+        switch (connectorDataType) {
+            case "TINYINT":
+            case "BINARY":
+                return BasicType.BYTE_TYPE;
+            case "SMALLINT":
+                return BasicType.SHORT_TYPE;
+            case "INT":
+                return BasicType.INT_TYPE;
+            case "BIGINT":
+                return BasicType.LONG_TYPE;
+            case "FLOAT":
+                return BasicType.FLOAT_TYPE;
+            case "DOUBLE":
+                return BasicType.DOUBLE_TYPE;
+            case "STRING":
+                return BasicType.STRING_TYPE;
+            case "DATE":
+                return LocalTimeType.LOCAL_DATE_TYPE;
+            case "TIMESTAMP":
+                return LocalTimeType.LOCAL_TIME_TYPE;
+            case "TIME":
+                return LocalTimeType.LOCAL_DATE_TYPE;
+            case "BOOLEAN":
+                return DecimalType.BOOLEAN_TYPE;
+            case "NULL":
+                return BasicType.VOID_TYPE;
+            default:
+                throw new MaxcomputeConnectorException(
+                        CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                        String.format(
+                                "SeaTunnel type not support this type [%s] 
now",
+                                connectorDataType));
+        }
+    }
+
+    @Override
+    public SeaTunnelDataType<?> toSeaTunnelType(
+            TypeInfo connectorDataType, Map<String, Object> dataTypeProperties)
+            throws DataTypeConvertException {
+        switch (connectorDataType.getOdpsType()) {
+            case MAP:
+                MapTypeInfo mapTypeInfo = (MapTypeInfo) connectorDataType;
+                return new MapType(
+                        toSeaTunnelType(mapTypeInfo.getKeyTypeInfo(), 
dataTypeProperties),
+                        toSeaTunnelType(mapTypeInfo.getValueTypeInfo(), 
dataTypeProperties));
+            case ARRAY:
+                ArrayTypeInfo arrayTypeInfo = (ArrayTypeInfo) 
connectorDataType;
+                switch (arrayTypeInfo.getElementTypeInfo().getOdpsType()) {
+                    case BOOLEAN:
+                        return ArrayType.BOOLEAN_ARRAY_TYPE;
+                    case INT:
+                        return ArrayType.INT_ARRAY_TYPE;
+                    case BIGINT:
+                        return ArrayType.LONG_ARRAY_TYPE;
+                    case FLOAT:
+                        return ArrayType.FLOAT_ARRAY_TYPE;
+                    case DOUBLE:
+                        return ArrayType.DOUBLE_ARRAY_TYPE;
+                    case STRING:
+                        return ArrayType.STRING_ARRAY_TYPE;
+                    default:
+                        throw new MaxcomputeConnectorException(
+                                CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                                String.format(
+                                        "SeaTunnel type not support this type 
[%s] now",
+                                        connectorDataType.getTypeName()));
+                }
+            case STRUCT:
+                StructTypeInfo structTypeInfo = (StructTypeInfo) 
connectorDataType;
+                List<TypeInfo> fields = structTypeInfo.getFieldTypeInfos();
+                List<String> fieldNames = new ArrayList<>(fields.size());
+                List<SeaTunnelDataType<?>> fieldTypes = new 
ArrayList<>(fields.size());
+                for (TypeInfo field : fields) {
+                    fieldNames.add(field.getTypeName());
+                    fieldTypes.add(toSeaTunnelType(field, dataTypeProperties));
+                }
+                return new SeaTunnelRowType(
+                        fieldNames.toArray(new String[0]),
+                        fieldTypes.toArray(new SeaTunnelDataType[0]));
+            case TINYINT:
+                return BasicType.BYTE_TYPE;
+            case SMALLINT:
+                return BasicType.SHORT_TYPE;
+            case INT:
+                return BasicType.INT_TYPE;
+            case BIGINT:
+                return BasicType.LONG_TYPE;
+            case BINARY:
+                return PrimitiveByteArrayType.INSTANCE;
+            case FLOAT:
+                return BasicType.FLOAT_TYPE;
+            case DOUBLE:
+                return BasicType.DOUBLE_TYPE;
+            case DECIMAL:
+                DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) 
connectorDataType;
+                return new DecimalType(decimalTypeInfo.getPrecision(), 
decimalTypeInfo.getScale());
+            case VARCHAR:
+            case CHAR:
+            case STRING:
+                return BasicType.STRING_TYPE;
+            case DATE:
+                return LocalTimeType.LOCAL_DATE_TYPE;
+            case DATETIME:
+                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+            case TIMESTAMP:
+                return LocalTimeType.LOCAL_TIME_TYPE;
+            case BOOLEAN:
+                return BasicType.BOOLEAN_TYPE;
+            case VOID:
+                return BasicType.VOID_TYPE;
+            case INTERVAL_DAY_TIME:
+            case INTERVAL_YEAR_MONTH:
+            default:
+                throw new MaxcomputeConnectorException(
+                        CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                        String.format(
+                                "SeaTunnel type not support this type [%s] 
now",
+                                connectorDataType.getTypeName()));
+        }
+    }
+
+    @Override
+    public TypeInfo toConnectorType(
+            SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> 
dataTypeProperties)
+            throws DataTypeConvertException {
+        switch (seaTunnelDataType.getSqlType()) {
+            case MAP:
+                MapType mapType = (MapType) seaTunnelDataType;
+                return TypeInfoFactory.getMapTypeInfo(
+                        toConnectorType(mapType.getKeyType(), 
dataTypeProperties),
+                        toConnectorType(mapType.getValueType(), 
dataTypeProperties));
+            case ARRAY:
+                ArrayType arrayType = (ArrayType) seaTunnelDataType;
+                return TypeInfoFactory.getArrayTypeInfo(
+                        toConnectorType(arrayType.getElementType(), 
dataTypeProperties));
+            case ROW:
+                SeaTunnelRowType rowType = (SeaTunnelRowType) 
seaTunnelDataType;
+                List<String> fieldNames = new 
ArrayList<>(rowType.getTotalFields());
+                List<TypeInfo> fieldTypes = new 
ArrayList<>(rowType.getTotalFields());
+                for (int i = 0; i < rowType.getTotalFields(); i++) {
+                    fieldNames.add(rowType.getFieldName(i));
+                    fieldTypes.add(toConnectorType(rowType.getFieldType(i), 
dataTypeProperties));
+                }
+                return TypeInfoFactory.getStructTypeInfo(fieldNames, 
fieldTypes);
+            case TINYINT:
+                return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TINYINT);
+            case SMALLINT:
+                return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.SMALLINT);
+            case INT:
+                return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.INT);
+            case BIGINT:
+                return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BIGINT);
+            case BYTES:
+                return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BINARY);
+            case FLOAT:
+                return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.FLOAT);
+            case DOUBLE:
+                return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DOUBLE);
+            case DECIMAL:
+                DecimalType decimalType = (DecimalType) seaTunnelDataType;
+                return TypeInfoFactory.getDecimalTypeInfo(
+                        decimalType.getPrecision(), decimalType.getScale());
+            case STRING:
+                return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.STRING);
+            case DATE:
+                return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DATE);
+            case TIMESTAMP:
+                return 
TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TIMESTAMP);
+            case TIME:
+                return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DATETIME);
+            case BOOLEAN:
+                return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BOOLEAN);
+            case NULL:
+                return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.VOID);
+            default:
+                throw new MaxcomputeConnectorException(
+                        CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                        String.format(
+                                "Maxcompute type not support this type [%s] 
now",
+                                seaTunnelDataType.getSqlType()));
+        }
+    }
+
+    @Override
+    public String getIdentity() {
+        return MaxcomputeConfig.PLUGIN_NAME;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java
index 18aaafad41..84bbccddb7 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java
@@ -23,6 +23,9 @@ import org.apache.seatunnel.api.configuration.Options;
 import java.io.Serializable;
 
 public class MaxcomputeConfig implements Serializable {
+
+    public static final String PLUGIN_NAME = "Maxcompute";
+
     private static final int SPLIT_ROW_DEFAULT = 10000;
     public static final Option<String> ACCESS_ID =
             Options.key("accessId")
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
index cf6d639ed1..31c5ab0791 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
@@ -34,6 +34,8 @@ import org.slf4j.LoggerFactory;
 
 import com.google.auto.service.AutoService;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME;
+
 @AutoService(SeaTunnelSink.class)
 public class MaxcomputeSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
     private static final Logger LOG = 
LoggerFactory.getLogger(MaxcomputeSink.class);
@@ -42,7 +44,7 @@ public class MaxcomputeSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
 
     @Override
     public String getPluginName() {
-        return "Maxcompute";
+        return PLUGIN_NAME;
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
index 5fbf1608d3..faa2f62910 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
@@ -28,6 +28,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.Maxcom
 import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
 import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.OVERWRITE;
 import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME;
 import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
 import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;
 
@@ -35,7 +36,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.Maxcom
 public class MaxcomputeSinkFactory implements TableSinkFactory {
     @Override
     public String factoryIdentifier() {
-        return "Maxcompute";
+        return PLUGIN_NAME;
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
index 3f6dd6cc76..417d660b86 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
@@ -23,7 +23,9 @@ import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SupportColumnProjection;
 import org.apache.seatunnel.api.source.SupportParallelism;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeTypeMapper;
@@ -31,22 +33,30 @@ import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeTypeM
 import com.google.auto.service.AutoService;
 import lombok.extern.slf4j.Slf4j;
 
+import static org.apache.seatunnel.api.table.catalog.CatalogTableUtil.SCHEMA;
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME;
+
 @Slf4j
 @AutoService(SeaTunnelSource.class)
 public class MaxcomputeSource
         implements SeaTunnelSource<SeaTunnelRow, MaxcomputeSourceSplit, 
MaxcomputeSourceState>,
-                SupportParallelism {
+                SupportParallelism,
+                SupportColumnProjection {
     private SeaTunnelRowType typeInfo;
     private Config pluginConfig;
 
     @Override
     public String getPluginName() {
-        return "Maxcompute";
+        return PLUGIN_NAME;
     }
 
     @Override
     public void prepare(Config pluginConfig) {
-        this.typeInfo = MaxcomputeTypeMapper.getSeaTunnelRowType(pluginConfig);
+        if (pluginConfig.hasPath(SCHEMA.key())) {
+            this.typeInfo = 
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
+        } else {
+            this.typeInfo = 
MaxcomputeTypeMapper.getSeaTunnelRowType(pluginConfig);
+        }
         this.pluginConfig = pluginConfig;
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java
index f5b4fd03b7..a1600f106d 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java
@@ -24,10 +24,12 @@ import 
org.apache.seatunnel.api.table.factory.TableSourceFactory;
 
 import com.google.auto.service.AutoService;
 
+import static org.apache.seatunnel.api.table.catalog.CatalogTableUtil.SCHEMA;
 import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID;
 import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY;
 import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
 import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME;
 import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
 import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SPLIT_ROW;
 import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;
@@ -36,14 +38,14 @@ import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.Maxcom
 public class MaxcomputeSourceFactory implements TableSourceFactory {
     @Override
     public String factoryIdentifier() {
-        return "Maxcompute";
+        return PLUGIN_NAME;
     }
 
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
                 .required(ACCESS_ID, ACCESS_KEY, ENDPOINT, PROJECT, TABLE_NAME)
-                .optional(PARTITION_SPEC, SPLIT_ROW)
+                .optional(PARTITION_SPEC, SPLIT_ROW, SCHEMA)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
index ac77214015..8e9eaf0785 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
@@ -20,15 +20,12 @@ package 
org.apache.seatunnel.connectors.seatunnel.maxcompute.util;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.table.type.ArrayType;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.DecimalType;
-import org.apache.seatunnel.api.table.type.LocalTimeType;
 import org.apache.seatunnel.api.table.type.MapType;
-import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog.MaxComputeDataTypeConvertor;
 import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
 
 import com.aliyun.odps.Column;
@@ -41,11 +38,7 @@ import com.aliyun.odps.data.Record;
 import com.aliyun.odps.data.SimpleStruct;
 import com.aliyun.odps.data.Varchar;
 import com.aliyun.odps.type.ArrayTypeInfo;
-import com.aliyun.odps.type.DecimalTypeInfo;
 import com.aliyun.odps.type.MapTypeInfo;
-import com.aliyun.odps.type.SimpleArrayTypeInfo;
-import com.aliyun.odps.type.SimpleMapTypeInfo;
-import com.aliyun.odps.type.SimpleStructTypeInfo;
 import com.aliyun.odps.type.StructTypeInfo;
 import com.aliyun.odps.type.TypeInfo;
 import lombok.extern.slf4j.Slf4j;
@@ -67,9 +60,9 @@ public class MaxcomputeTypeMapper implements Serializable {
 
     public static SeaTunnelRow getSeaTunnelRowData(Record rs, SeaTunnelRowType 
typeInfo) {
         List<Object> fields = new ArrayList<>();
-        SeaTunnelDataType<?>[] seaTunnelDataTypes = typeInfo.getFieldTypes();
-        for (int i = 0; i < rs.getColumns().length; i++) {
-            fields.add(resolveObject2SeaTunnel(rs.get(i), 
seaTunnelDataTypes[i]));
+        for (int i = 0; i < typeInfo.getTotalFields(); i++) {
+            String typeName = typeInfo.getFieldName(i);
+            fields.add(resolveObject2SeaTunnel(rs.get(typeName), 
typeInfo.getFieldType(i)));
         }
         return new SeaTunnelRow(fields.toArray());
     }
@@ -92,11 +85,12 @@ public class MaxcomputeTypeMapper implements Serializable {
         ArrayList<SeaTunnelDataType<?>> seaTunnelDataTypes = new ArrayList<>();
         ArrayList<String> fieldNames = new ArrayList<>();
         try {
+            MaxComputeDataTypeConvertor typeConvertor = new 
MaxComputeDataTypeConvertor();
             for (int i = 0; i < tableSchema.getColumns().size(); i++) {
                 fieldNames.add(tableSchema.getColumns().get(i).getName());
                 TypeInfo maxcomputeTypeInfo = 
tableSchema.getColumns().get(i).getTypeInfo();
                 SeaTunnelDataType<?> seaTunnelDataType =
-                        maxcompute2SeaTunnelType(maxcomputeTypeInfo);
+                        typeConvertor.toSeaTunnelType(maxcomputeTypeInfo, 
null);
                 seaTunnelDataTypes.add(seaTunnelDataType);
             }
         } catch (Exception e) {
@@ -107,88 +101,6 @@ public class MaxcomputeTypeMapper implements Serializable {
                 seaTunnelDataTypes.toArray(new 
SeaTunnelDataType<?>[seaTunnelDataTypes.size()]));
     }
 
-    private static SeaTunnelDataType<?> maxcompute2SeaTunnelType(TypeInfo 
typeInfo) {
-        switch (typeInfo.getOdpsType()) {
-            case MAP:
-                MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
-                return new MapType(
-                        maxcompute2SeaTunnelType(mapTypeInfo.getKeyTypeInfo()),
-                        
maxcompute2SeaTunnelType(mapTypeInfo.getValueTypeInfo()));
-            case ARRAY:
-                ArrayTypeInfo arrayTypeInfo = (ArrayTypeInfo) typeInfo;
-                switch (arrayTypeInfo.getElementTypeInfo().getOdpsType()) {
-                    case BOOLEAN:
-                        return ArrayType.BOOLEAN_ARRAY_TYPE;
-                    case INT:
-                        return ArrayType.INT_ARRAY_TYPE;
-                    case BIGINT:
-                        return ArrayType.LONG_ARRAY_TYPE;
-                    case FLOAT:
-                        return ArrayType.FLOAT_ARRAY_TYPE;
-                    case DOUBLE:
-                        return ArrayType.DOUBLE_ARRAY_TYPE;
-                    case STRING:
-                        return ArrayType.STRING_ARRAY_TYPE;
-                    default:
-                        throw new MaxcomputeConnectorException(
-                                CommonErrorCode.UNSUPPORTED_DATA_TYPE,
-                                String.format(
-                                        "SeaTunnel type not support this type 
[%s] now",
-                                        typeInfo.getTypeName()));
-                }
-            case STRUCT:
-                StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
-                List<TypeInfo> fields = structTypeInfo.getFieldTypeInfos();
-                List<String> fieldNames = new ArrayList<>(fields.size());
-                List<SeaTunnelDataType<?>> fieldTypes = new 
ArrayList<>(fields.size());
-                for (TypeInfo field : fields) {
-                    fieldNames.add(field.getTypeName());
-                    fieldTypes.add(maxcompute2SeaTunnelType(field));
-                }
-                return new SeaTunnelRowType(
-                        fieldNames.toArray(new String[0]),
-                        fieldTypes.toArray(new SeaTunnelDataType[0]));
-            case TINYINT:
-                return BasicType.BYTE_TYPE;
-            case SMALLINT:
-                return BasicType.SHORT_TYPE;
-            case INT:
-                return BasicType.INT_TYPE;
-            case BIGINT:
-                return BasicType.LONG_TYPE;
-            case BINARY:
-                return PrimitiveByteArrayType.INSTANCE;
-            case FLOAT:
-                return BasicType.FLOAT_TYPE;
-            case DOUBLE:
-                return BasicType.DOUBLE_TYPE;
-            case DECIMAL:
-                DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo;
-                return new DecimalType(decimalTypeInfo.getPrecision(), 
decimalTypeInfo.getScale());
-            case VARCHAR:
-            case CHAR:
-            case STRING:
-                return BasicType.STRING_TYPE;
-            case DATE:
-                return LocalTimeType.LOCAL_DATE_TYPE;
-            case DATETIME:
-            case TIMESTAMP:
-                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
-            case BOOLEAN:
-                return BasicType.BOOLEAN_TYPE;
-            case VOID:
-                return BasicType.VOID_TYPE;
-            case INTERVAL_DAY_TIME:
-            case INTERVAL_YEAR_MONTH:
-            default:
-                throw new MaxcomputeConnectorException(
-                        CommonErrorCode.UNSUPPORTED_DATA_TYPE,
-                        String.format(
-                                "SeaTunnel type not support this type [%s] 
now",
-                                typeInfo.getTypeName()));
-        }
-    }
-
     private static Object resolveObject2SeaTunnel(Object field, 
SeaTunnelDataType<?> fieldType) {
         if (field == null) {
             return null;
@@ -245,11 +157,10 @@ public class MaxcomputeTypeMapper implements Serializable 
{
             case DOUBLE:
             case BIGINT:
             case BOOLEAN:
+            case DECIMAL:
                 return field;
             case BYTES:
                 return ((Binary) field).data();
-            case DECIMAL:
-                return null;
             case STRING:
                 if (field instanceof byte[]) {
                     return new String((byte[]) field);
@@ -288,7 +199,7 @@ public class MaxcomputeTypeMapper implements Serializable {
             case ARRAY:
                 ArrayList<Object> origArray = new ArrayList<>();
                 Arrays.stream((Object[]) 
field).iterator().forEachRemaining(origArray::add);
-                switch (((SimpleArrayTypeInfo) 
typeInfo).getElementTypeInfo().getOdpsType()) {
+                switch (((ArrayTypeInfo) 
typeInfo).getElementTypeInfo().getOdpsType()) {
                     case STRING:
                     case BOOLEAN:
                     case INT:
@@ -305,8 +216,8 @@ public class MaxcomputeTypeMapper implements Serializable {
                 }
             case MAP:
                 HashMap<Object, Object> dataMap = new HashMap<>();
-                TypeInfo keyTypeInfo = ((SimpleMapTypeInfo) 
typeInfo).getKeyTypeInfo();
-                TypeInfo valueTypeInfo = ((SimpleMapTypeInfo) 
typeInfo).getValueTypeInfo();
+                TypeInfo keyTypeInfo = ((MapTypeInfo) 
typeInfo).getKeyTypeInfo();
+                TypeInfo valueTypeInfo = ((MapTypeInfo) 
typeInfo).getValueTypeInfo();
                 HashMap<Object, Object> origDataMap = (HashMap<Object, 
Object>) field;
                 origDataMap.forEach(
                         (key, value) ->
@@ -316,7 +227,7 @@ public class MaxcomputeTypeMapper implements Serializable {
                 return origDataMap;
             case STRUCT:
                 Object[] fields = ((SeaTunnelRow) field).getFields();
-                List<TypeInfo> typeInfos = ((SimpleStructTypeInfo) 
typeInfo).getFieldTypeInfos();
+                List<TypeInfo> typeInfos = ((StructTypeInfo) 
typeInfo).getFieldTypeInfos();
                 ArrayList<Object> origStruct = new ArrayList<>();
                 for (int i = 0; i < fields.length; i++) {
                     origStruct.add(resolveObject2Maxcompute(fields[i], 
typeInfos.get(i)));
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertorTest.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertorTest.java
new file mode 100644
index 0000000000..0af30301ca
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertorTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import com.aliyun.odps.OdpsType;
+import com.aliyun.odps.type.MapTypeInfo;
+import com.aliyun.odps.type.TypeInfoFactory;
+import com.aliyun.odps.type.VarcharTypeInfo;
+
+public class MaxComputeDataTypeConvertorTest {
+
+    private final MaxComputeDataTypeConvertor maxComputeDataTypeConvertor =
+            new MaxComputeDataTypeConvertor();
+
+    @Test
+    public void testTypeInfoStrToSeaTunnelType() {
+        String typeInfoStr = "MAP<STRING,STRING>";
+        SeaTunnelDataType<?> seaTunnelType =
+                maxComputeDataTypeConvertor.toSeaTunnelType(typeInfoStr);
+        Assertions.assertEquals(BasicType.STRING_TYPE, ((MapType) 
seaTunnelType).getKeyType());
+        Assertions.assertEquals(BasicType.STRING_TYPE, ((MapType) 
seaTunnelType).getKeyType());
+    }
+
+    @Test
+    public void testTypeInfoToSeaTunnelType() {
+        MapTypeInfo simpleMapTypeInfo =
+                TypeInfoFactory.getMapTypeInfo(new VarcharTypeInfo(10), new 
VarcharTypeInfo(10));
+        MapType seaTunnelMapType =
+                (MapType) 
maxComputeDataTypeConvertor.toSeaTunnelType(simpleMapTypeInfo, null);
+        Assertions.assertEquals(BasicType.STRING_TYPE, 
seaTunnelMapType.getKeyType());
+        Assertions.assertEquals(BasicType.STRING_TYPE, 
seaTunnelMapType.getValueType());
+    }
+
+    @Test
+    public void testSeaTunnelTypeToTypeInfo() {
+        MapType mapType = new MapType<>(BasicType.STRING_TYPE, 
BasicType.STRING_TYPE);
+        MapTypeInfo mapTypeInfo =
+                (MapTypeInfo) 
maxComputeDataTypeConvertor.toConnectorType(mapType, null);
+        Assertions.assertEquals(OdpsType.STRING, 
mapTypeInfo.getKeyTypeInfo().getOdpsType());
+        Assertions.assertEquals(OdpsType.STRING, 
mapTypeInfo.getValueTypeInfo().getOdpsType());
+    }
+
+    @Test
+    public void getIdentity() {
+        Assertions.assertEquals(
+                MaxcomputeConfig.PLUGIN_NAME, 
maxComputeDataTypeConvertor.getIdentity());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceTest.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceTest.java
new file mode 100644
index 0000000000..9d394775d1
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.source;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class MaxcomputeSourceTest {
+
+    @Test
+    public void prepare() {
+        Config fields =
+                ConfigFactory.empty()
+                        .withValue("id", ConfigValueFactory.fromAnyRef("int"))
+                        .withValue("name", 
ConfigValueFactory.fromAnyRef("string"))
+                        .withValue("age", 
ConfigValueFactory.fromAnyRef("int"));
+
+        Config schema = fields.atKey("fields").atKey("schema");
+
+        MaxcomputeSource maxcomputeSource = new MaxcomputeSource();
+        Assertions.assertDoesNotThrow(() -> maxcomputeSource.prepare(schema));
+
+        SeaTunnelRowType seaTunnelRowType = maxcomputeSource.getProducedType();
+        Assertions.assertEquals(SqlType.INT, 
seaTunnelRowType.getFieldType(0).getSqlType());
+    }
+}

Reply via email to